diff options
Diffstat (limited to 'synapse/app/synchrotron.py')
-rw-r--r-- | synapse/app/synchrotron.py | 42 |
1 files changed, 22 insertions, 20 deletions
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 6808d6d3e0..6662340797 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -17,6 +17,11 @@ import contextlib import logging import sys +from six import iteritems + +from twisted.internet import defer, reactor +from twisted.web.resource import NoResource + import synapse from synapse.api.constants import EventTypes from synapse.app import _base @@ -36,12 +41,12 @@ from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore from synapse.replication.slave.storage.devices import SlavedDeviceStore from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.filtering import SlavedFilteringStore +from synapse.replication.slave.storage.groups import SlavedGroupServerStore from synapse.replication.slave.storage.presence import SlavedPresenceStore from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore from synapse.replication.slave.storage.receipts import SlavedReceiptsStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore from synapse.replication.slave.storage.room import RoomStore -from synapse.replication.slave.storage.groups import SlavedGroupServerStore from synapse.replication.tcp.client import ReplicationClientHandler from synapse.rest.client.v1 import events from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet @@ -50,16 +55,11 @@ from synapse.rest.client.v2_alpha import sync from synapse.server import HomeServer from synapse.storage.engines import create_engine from synapse.storage.presence import UserPresenceState -from synapse.storage.roommember import RoomMemberStore from synapse.util.httpresourcetree import create_resource_tree from synapse.util.logcontext import LoggingContext, run_in_background from synapse.util.manhole import manhole from synapse.util.stringutils import random_string from synapse.util.versionstring import get_version_string -from twisted.internet import defer, reactor -from twisted.web.resource import NoResource - -from six import iteritems logger = logging.getLogger("synapse.app.synchrotron") @@ -80,9 +80,7 @@ class SynchrotronSlavedStore( RoomStore, BaseSlavedStore, ): - did_forget = ( - RoomMemberStore.__dict__["did_forget"] - ) + pass UPDATE_SYNCING_USERS_MS = 10 * 1000 @@ -116,7 +114,10 @@ class SynchrotronPresence(object): logger.info("Presence process_id is %r", self.process_id) def send_user_sync(self, user_id, is_syncing, last_sync_ms): - self.hs.get_tcp_replication().send_user_sync(user_id, is_syncing, last_sync_ms) + if self.hs.config.use_presence: + self.hs.get_tcp_replication().send_user_sync( + user_id, is_syncing, last_sync_ms + ) def mark_as_coming_online(self, user_id): """A user has started syncing. Send a UserSync to the master, unless they @@ -213,10 +214,13 @@ class SynchrotronPresence(object): yield self.notify_from_replication(states, stream_id) def get_currently_syncing_users(self): - return [ - user_id for user_id, count in iteritems(self.user_to_num_current_syncs) - if count > 0 - ] + if self.hs.config.use_presence: + return [ + user_id for user_id, count in iteritems(self.user_to_num_current_syncs) + if count > 0 + ] + else: + return set() class SynchrotronTyping(object): @@ -245,10 +249,7 @@ class SynchrotronApplicationService(object): class SynchrotronServer(HomeServer): - def setup(self): - logger.info("Setting up.") - self.datastore = SynchrotronSlavedStore(self.get_db_conn(), self) - logger.info("Finished setting up.") + DATASTORE_CLASS = SynchrotronSlavedStore def _listen_http(self, listener_config): port = listener_config["port"] @@ -305,7 +306,7 @@ class SynchrotronServer(HomeServer): elif listener["type"] == "metrics": if not self.get_config().enable_metrics: logger.warn(("Metrics listener configured, but " - "collect_metrics is not enabled!")) + "enable_metrics is not True!")) else: _base.listen_metrics(listener["bind_addresses"], listener["port"]) @@ -334,8 +335,9 @@ class SyncReplicationHandler(ReplicationClientHandler): self.presence_handler = hs.get_presence_handler() self.notifier = hs.get_notifier() + @defer.inlineCallbacks def on_rdata(self, stream_name, token, rows): - super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows) + yield super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows) run_in_background(self.process_and_notify, stream_name, token, rows) def get_streams_to_replicate(self): |