diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 6808d6d3e0..6662340797 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -17,6 +17,11 @@ import contextlib
import logging
import sys
+from six import iteritems
+
+from twisted.internet import defer, reactor
+from twisted.web.resource import NoResource
+
import synapse
from synapse.api.constants import EventTypes
from synapse.app import _base
@@ -36,12 +41,12 @@ from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
from synapse.replication.slave.storage.devices import SlavedDeviceStore
from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.replication.slave.storage.filtering import SlavedFilteringStore
+from synapse.replication.slave.storage.groups import SlavedGroupServerStore
from synapse.replication.slave.storage.presence import SlavedPresenceStore
from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.room import RoomStore
-from synapse.replication.slave.storage.groups import SlavedGroupServerStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.client.v1 import events
from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet
@@ -50,16 +55,11 @@ from synapse.rest.client.v2_alpha import sync
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
from synapse.storage.presence import UserPresenceState
-from synapse.storage.roommember import RoomMemberStore
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext, run_in_background
from synapse.util.manhole import manhole
from synapse.util.stringutils import random_string
from synapse.util.versionstring import get_version_string
-from twisted.internet import defer, reactor
-from twisted.web.resource import NoResource
-
-from six import iteritems
logger = logging.getLogger("synapse.app.synchrotron")
@@ -80,9 +80,7 @@ class SynchrotronSlavedStore(
RoomStore,
BaseSlavedStore,
):
- did_forget = (
- RoomMemberStore.__dict__["did_forget"]
- )
+ pass
UPDATE_SYNCING_USERS_MS = 10 * 1000
@@ -116,7 +114,10 @@ class SynchrotronPresence(object):
logger.info("Presence process_id is %r", self.process_id)
def send_user_sync(self, user_id, is_syncing, last_sync_ms):
- self.hs.get_tcp_replication().send_user_sync(user_id, is_syncing, last_sync_ms)
+ if self.hs.config.use_presence:
+ self.hs.get_tcp_replication().send_user_sync(
+ user_id, is_syncing, last_sync_ms
+ )
def mark_as_coming_online(self, user_id):
"""A user has started syncing. Send a UserSync to the master, unless they
@@ -213,10 +214,13 @@ class SynchrotronPresence(object):
yield self.notify_from_replication(states, stream_id)
def get_currently_syncing_users(self):
- return [
- user_id for user_id, count in iteritems(self.user_to_num_current_syncs)
- if count > 0
- ]
+ if self.hs.config.use_presence:
+ return [
+ user_id for user_id, count in iteritems(self.user_to_num_current_syncs)
+ if count > 0
+ ]
+ else:
+ return set()
class SynchrotronTyping(object):
@@ -245,10 +249,7 @@ class SynchrotronApplicationService(object):
class SynchrotronServer(HomeServer):
- def setup(self):
- logger.info("Setting up.")
- self.datastore = SynchrotronSlavedStore(self.get_db_conn(), self)
- logger.info("Finished setting up.")
+ DATASTORE_CLASS = SynchrotronSlavedStore
def _listen_http(self, listener_config):
port = listener_config["port"]
@@ -305,7 +306,7 @@ class SynchrotronServer(HomeServer):
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
logger.warn(("Metrics listener configured, but "
- "collect_metrics is not enabled!"))
+ "enable_metrics is not True!"))
else:
_base.listen_metrics(listener["bind_addresses"],
listener["port"])
@@ -334,8 +335,9 @@ class SyncReplicationHandler(ReplicationClientHandler):
self.presence_handler = hs.get_presence_handler()
self.notifier = hs.get_notifier()
+ @defer.inlineCallbacks
def on_rdata(self, stream_name, token, rows):
- super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows)
+ yield super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows)
run_in_background(self.process_and_notify, stream_name, token, rows)
def get_streams_to_replicate(self):
|