summary refs log tree commit diff
path: root/synapse/app/pusher.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/app/pusher.py')
-rw-r--r--synapse/app/pusher.py24
1 files changed, 12 insertions, 12 deletions
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index 244c604de9..83b0863f00 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -28,6 +28,7 @@ from synapse.config.logger import setup_logging
 from synapse.http.site import SynapseSite
 from synapse.metrics import RegistryProxy
 from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
+from synapse.replication.slave.storage._base import __func__
 from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
 from synapse.replication.slave.storage.events import SlavedEventStore
 from synapse.replication.slave.storage.pushers import SlavedPusherStore
@@ -49,31 +50,31 @@ class PusherSlaveStore(
     SlavedAccountDataStore
 ):
     update_pusher_last_stream_ordering_and_success = (
-        DataStore.update_pusher_last_stream_ordering_and_success.__func__
+        __func__(DataStore.update_pusher_last_stream_ordering_and_success)
     )
 
     update_pusher_failing_since = (
-        DataStore.update_pusher_failing_since.__func__
+        __func__(DataStore.update_pusher_failing_since)
     )
 
     update_pusher_last_stream_ordering = (
-        DataStore.update_pusher_last_stream_ordering.__func__
+        __func__(DataStore.update_pusher_last_stream_ordering)
     )
 
     get_throttle_params_by_room = (
-        DataStore.get_throttle_params_by_room.__func__
+        __func__(DataStore.get_throttle_params_by_room)
     )
 
     set_throttle_params = (
-        DataStore.set_throttle_params.__func__
+        __func__(DataStore.set_throttle_params)
     )
 
     get_time_of_last_push_action_before = (
-        DataStore.get_time_of_last_push_action_before.__func__
+        __func__(DataStore.get_time_of_last_push_action_before)
     )
 
     get_profile_displayname = (
-        DataStore.get_profile_displayname.__func__
+        __func__(DataStore.get_profile_displayname)
     )
 
 
@@ -160,11 +161,11 @@ class PusherReplicationHandler(ReplicationClientHandler):
                     else:
                         yield self.start_pusher(row.user_id, row.app_id, row.pushkey)
             elif stream_name == "events":
-                self.pusher_pool.on_new_notifications(
+                yield self.pusher_pool.on_new_notifications(
                     token, token,
                 )
             elif stream_name == "receipts":
-                self.pusher_pool.on_new_receipts(
+                yield self.pusher_pool.on_new_receipts(
                     token, token, set(row.room_id for row in rows)
                 )
         except Exception:
@@ -182,7 +183,7 @@ class PusherReplicationHandler(ReplicationClientHandler):
     def start_pusher(self, user_id, app_id, pushkey):
         key = "%s:%s" % (app_id, pushkey)
         logger.info("Starting pusher %r / %r", user_id, key)
-        return self.pusher_pool._refresh_pusher(app_id, pushkey, user_id)
+        return self.pusher_pool.start_pusher_by_id(app_id, pushkey, user_id)
 
 
 def start(config_options):
@@ -191,7 +192,7 @@ def start(config_options):
             "Synapse pusher", config_options
         )
     except ConfigError as e:
-        sys.stderr.write("\n" + e.message + "\n")
+        sys.stderr.write("\n" + str(e) + "\n")
         sys.exit(1)
 
     assert config.worker_app == "synapse.app.pusher"
@@ -228,7 +229,6 @@ def start(config_options):
     def start():
         ps.get_pusherpool().start()
         ps.get_datastore().start_profiling()
-        ps.get_state_handler().start_caching()
 
     reactor.callWhenRunning(start)