diff --git a/synapse/app/admin_cmd.py b/synapse/app/admin_cmd.py
index 8e36bc57d3..1c7c6ec0c8 100644
--- a/synapse/app/admin_cmd.py
+++ b/synapse/app/admin_cmd.py
@@ -84,8 +84,7 @@ class AdminCmdServer(HomeServer):
class AdminCmdReplicationHandler(ReplicationClientHandler):
- @defer.inlineCallbacks
- def on_rdata(self, stream_name, token, rows):
+ async def on_rdata(self, stream_name, token, rows):
pass
def get_streams_to_replicate(self):
diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py
index e82e0f11e3..2217d4a4fb 100644
--- a/synapse/app/appservice.py
+++ b/synapse/app/appservice.py
@@ -115,9 +115,8 @@ class ASReplicationHandler(ReplicationClientHandler):
super(ASReplicationHandler, self).__init__(hs.get_datastore())
self.appservice_handler = hs.get_application_service_handler()
- @defer.inlineCallbacks
- def on_rdata(self, stream_name, token, rows):
- yield super(ASReplicationHandler, self).on_rdata(stream_name, token, rows)
+ async def on_rdata(self, stream_name, token, rows):
+ await super(ASReplicationHandler, self).on_rdata(stream_name, token, rows)
if stream_name == "events":
max_stream_id = self.store.get_room_max_stream_ordering()
diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py
index 3edfe19567..ca96da6a4a 100644
--- a/synapse/app/client_reader.py
+++ b/synapse/app/client_reader.py
@@ -62,6 +62,9 @@ from synapse.rest.client.v2_alpha.keys import KeyChangesServlet, KeyQueryServlet
from synapse.rest.client.v2_alpha.register import RegisterRestServlet
from synapse.rest.client.versions import VersionsRestServlet
from synapse.server import HomeServer
+from synapse.storage.data_stores.main.monthly_active_users import (
+ MonthlyActiveUsersWorkerStore,
+)
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.manhole import manhole
from synapse.util.versionstring import get_version_string
@@ -85,6 +88,7 @@ class ClientReaderSlavedStore(
SlavedTransactionStore,
SlavedProfileStore,
SlavedClientIpStore,
+ MonthlyActiveUsersWorkerStore,
BaseSlavedStore,
):
pass
diff --git a/synapse/app/event_creator.py b/synapse/app/event_creator.py
index d0ddbe38fc..58e5b354f6 100644
--- a/synapse/app/event_creator.py
+++ b/synapse/app/event_creator.py
@@ -56,6 +56,9 @@ from synapse.rest.client.v1.room import (
RoomStateEventRestServlet,
)
from synapse.server import HomeServer
+from synapse.storage.data_stores.main.monthly_active_users import (
+ MonthlyActiveUsersWorkerStore,
+)
from synapse.storage.data_stores.main.user_directory import UserDirectoryStore
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.manhole import manhole
@@ -81,6 +84,7 @@ class EventCreatorSlavedStore(
SlavedEventStore,
SlavedRegistrationStore,
RoomStore,
+ MonthlyActiveUsersWorkerStore,
BaseSlavedStore,
):
pass
diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py
index 311523e0ed..1f1cea1416 100644
--- a/synapse/app/federation_reader.py
+++ b/synapse/app/federation_reader.py
@@ -46,6 +46,9 @@ from synapse.replication.slave.storage.transactions import SlavedTransactionStor
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.key.v2 import KeyApiV2Resource
from synapse.server import HomeServer
+from synapse.storage.data_stores.main.monthly_active_users import (
+ MonthlyActiveUsersWorkerStore,
+)
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.manhole import manhole
from synapse.util.versionstring import get_version_string
@@ -66,6 +69,7 @@ class FederationReaderSlavedStore(
RoomStore,
DirectoryStore,
SlavedTransactionStore,
+ MonthlyActiveUsersWorkerStore,
BaseSlavedStore,
):
pass
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index 83c436229c..38d11fdd0f 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -145,9 +145,8 @@ class FederationSenderReplicationHandler(ReplicationClientHandler):
super(FederationSenderReplicationHandler, self).__init__(hs.get_datastore())
self.send_handler = FederationSenderHandler(hs, self)
- @defer.inlineCallbacks
- def on_rdata(self, stream_name, token, rows):
- yield super(FederationSenderReplicationHandler, self).on_rdata(
+ async def on_rdata(self, stream_name, token, rows):
+ await super(FederationSenderReplicationHandler, self).on_rdata(
stream_name, token, rows
)
self.send_handler.process_replication_rows(stream_name, token, rows)
@@ -159,6 +158,13 @@ class FederationSenderReplicationHandler(ReplicationClientHandler):
args.update(self.send_handler.stream_positions())
return args
+ def on_remote_server_up(self, server: str):
+ """Called when get a new REMOTE_SERVER_UP command."""
+
+ # Let's wake up the transaction queue for the server in case we have
+ # pending stuff to send to it.
+ self.send_handler.wake_destination(server)
+
def start(config_options):
try:
@@ -206,7 +212,7 @@ class FederationSenderHandler(object):
to the federation sender.
"""
- def __init__(self, hs, replication_client):
+ def __init__(self, hs: FederationSenderServer, replication_client):
self.store = hs.get_datastore()
self._is_mine_id = hs.is_mine_id
self.federation_sender = hs.get_federation_sender()
@@ -227,6 +233,9 @@ class FederationSenderHandler(object):
self.store.get_room_max_stream_ordering()
)
+ def wake_destination(self, server: str):
+ self.federation_sender.wake_destination(server)
+
def stream_positions(self):
return {"federation": self.federation_position}
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index e5b44a5eed..c2a334a2b0 100644
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -31,7 +31,7 @@ from prometheus_client import Gauge
from twisted.application import service
from twisted.internet import defer, reactor
from twisted.python.failure import Failure
-from twisted.web.resource import EncodingResourceWrapper, NoResource
+from twisted.web.resource import EncodingResourceWrapper, IResource, NoResource
from twisted.web.server import GzipEncoderFactory
from twisted.web.static import File
@@ -109,7 +109,16 @@ class SynapseHomeServer(HomeServer):
for path, resmodule in additional_resources.items():
handler_cls, config = load_module(resmodule)
handler = handler_cls(config, module_api)
- resources[path] = AdditionalResource(self, handler.handle_request)
+ if IResource.providedBy(handler):
+ resource = handler
+ elif hasattr(handler, "handle_request"):
+ resource = AdditionalResource(self, handler.handle_request)
+ else:
+ raise ConfigError(
+ "additional_resource %s does not implement a known interface"
+ % (resmodule["module"],)
+ )
+ resources[path] = resource
# try to find something useful to redirect '/' to
if WEB_CLIENT_PREFIX in resources:
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index 09e639040a..e46b6ac598 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -141,9 +141,8 @@ class PusherReplicationHandler(ReplicationClientHandler):
self.pusher_pool = hs.get_pusherpool()
- @defer.inlineCallbacks
- def on_rdata(self, stream_name, token, rows):
- yield super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows)
+ async def on_rdata(self, stream_name, token, rows):
+ await super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows)
run_in_background(self.poke_pushers, stream_name, token, rows)
@defer.inlineCallbacks
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 03031ee34d..8982c0676e 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -54,6 +54,9 @@ from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet
from synapse.rest.client.v1.room import RoomInitialSyncRestServlet
from synapse.rest.client.v2_alpha import sync
from synapse.server import HomeServer
+from synapse.storage.data_stores.main.monthly_active_users import (
+ MonthlyActiveUsersWorkerStore,
+)
from synapse.storage.data_stores.main.presence import UserPresenceState
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.manhole import manhole
@@ -77,6 +80,7 @@ class SynchrotronSlavedStore(
SlavedEventStore,
SlavedClientIpStore,
RoomStore,
+ MonthlyActiveUsersWorkerStore,
BaseSlavedStore,
):
pass
@@ -358,9 +362,8 @@ class SyncReplicationHandler(ReplicationClientHandler):
self.presence_handler = hs.get_presence_handler()
self.notifier = hs.get_notifier()
- @defer.inlineCallbacks
- def on_rdata(self, stream_name, token, rows):
- yield super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows)
+ async def on_rdata(self, stream_name, token, rows):
+ await super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows)
run_in_background(self.process_and_notify, stream_name, token, rows)
def get_streams_to_replicate(self):
diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py
index 1257098f92..ba536d6f04 100644
--- a/synapse/app/user_dir.py
+++ b/synapse/app/user_dir.py
@@ -172,9 +172,8 @@ class UserDirectoryReplicationHandler(ReplicationClientHandler):
super(UserDirectoryReplicationHandler, self).__init__(hs.get_datastore())
self.user_directory = hs.get_user_directory_handler()
- @defer.inlineCallbacks
- def on_rdata(self, stream_name, token, rows):
- yield super(UserDirectoryReplicationHandler, self).on_rdata(
+ async def on_rdata(self, stream_name, token, rows):
+ await super(UserDirectoryReplicationHandler, self).on_rdata(
stream_name, token, rows
)
if stream_name == EventsStream.NAME:
|