diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index 64df47f9cc..244c604de9 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -16,6 +16,9 @@
import logging
import sys
+from twisted.internet import defer, reactor
+from twisted.web.resource import NoResource
+
import synapse
from synapse import events
from synapse.app import _base
@@ -37,8 +40,6 @@ from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext, run_in_background
from synapse.util.manhole import manhole
from synapse.util.versionstring import get_version_string
-from twisted.internet import defer, reactor
-from twisted.web.resource import NoResource
logger = logging.getLogger("synapse.app.pusher")
@@ -77,10 +78,7 @@ class PusherSlaveStore(
class PusherServer(HomeServer):
- def setup(self):
- logger.info("Setting up.")
- self.datastore = PusherSlaveStore(self.get_db_conn(), self)
- logger.info("Finished setting up.")
+ DATASTORE_CLASS = PusherSlaveStore
def remove_pusher(self, app_id, push_key, user_id):
self.get_tcp_replication().send_remove_pusher(app_id, push_key, user_id)
@@ -128,7 +126,7 @@ class PusherServer(HomeServer):
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warn(("Metrics listener configured, but "
- "collect_metrics is not enabled!"))
+ "enable_metrics is not True!"))
else:
_base.listen_metrics(listener["bind_addresses"],
listener["port"])
@@ -147,8 +145,9 @@ class PusherReplicationHandler(ReplicationClientHandler):
self.pusher_pool = hs.get_pusherpool()
+ @defer.inlineCallbacks
def on_rdata(self, stream_name, token, rows):
- super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows)
+ yield super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows)
run_in_background(self.poke_pushers, stream_name, token, rows)
@defer.inlineCallbacks
@@ -161,11 +160,11 @@ class PusherReplicationHandler(ReplicationClientHandler):
else:
yield self.start_pusher(row.user_id, row.app_id, row.pushkey)
elif stream_name == "events":
- yield self.pusher_pool.on_new_notifications(
+ self.pusher_pool.on_new_notifications(
token, token,
)
elif stream_name == "receipts":
- yield self.pusher_pool.on_new_receipts(
+ self.pusher_pool.on_new_receipts(
token, token, set(row.room_id for row in rows)
)
except Exception:
|