diff options
Diffstat (limited to 'synapse/app')
-rw-r--r-- | synapse/app/_base.py | 6 | ||||
-rw-r--r-- | synapse/app/appservice.py | 3 | ||||
-rw-r--r-- | synapse/app/client_reader.py | 6 | ||||
-rw-r--r-- | synapse/app/event_creator.py | 6 | ||||
-rw-r--r-- | synapse/app/federation_reader.py | 18 | ||||
-rw-r--r-- | synapse/app/federation_sender.py | 11 | ||||
-rw-r--r-- | synapse/app/frontend_proxy.py | 41 | ||||
-rwxr-xr-x | synapse/app/homeserver.py | 20 | ||||
-rw-r--r-- | synapse/app/media_repository.py | 6 | ||||
-rw-r--r-- | synapse/app/pusher.py | 7 | ||||
-rw-r--r-- | synapse/app/synchrotron.py | 19 | ||||
-rw-r--r-- | synapse/app/user_dir.py | 5 |
12 files changed, 118 insertions, 30 deletions
diff --git a/synapse/app/_base.py b/synapse/app/_base.py index 391bd14c5c..7c866e246a 100644 --- a/synapse/app/_base.py +++ b/synapse/app/_base.py @@ -140,7 +140,7 @@ def listen_metrics(bind_addresses, port): logger.info("Metrics now reporting on %s:%d", host, port) -def listen_tcp(bind_addresses, port, factory, backlog=50): +def listen_tcp(bind_addresses, port, factory, reactor=reactor, backlog=50): """ Create a TCP socket for a port and several addresses """ @@ -156,7 +156,9 @@ def listen_tcp(bind_addresses, port, factory, backlog=50): check_bind_error(e, address, bind_addresses) -def listen_ssl(bind_addresses, port, factory, context_factory, backlog=50): +def listen_ssl( + bind_addresses, port, factory, context_factory, reactor=reactor, backlog=50 +): """ Create an SSL socket for a port and several addresses """ diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py index 9a37384fb7..3348a8ec6d 100644 --- a/synapse/app/appservice.py +++ b/synapse/app/appservice.py @@ -117,8 +117,9 @@ class ASReplicationHandler(ReplicationClientHandler): super(ASReplicationHandler, self).__init__(hs.get_datastore()) self.appservice_handler = hs.get_application_service_handler() + @defer.inlineCallbacks def on_rdata(self, stream_name, token, rows): - super(ASReplicationHandler, self).on_rdata(stream_name, token, rows) + yield super(ASReplicationHandler, self).on_rdata(stream_name, token, rows) if stream_name == "events": max_stream_id = self.store.get_room_max_stream_ordering() diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py index e2c91123db..ab79a45646 100644 --- a/synapse/app/client_reader.py +++ b/synapse/app/client_reader.py @@ -39,7 +39,7 @@ from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.keys import SlavedKeyStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore from synapse.replication.slave.storage.room import RoomStore -from synapse.replication.slave.storage.transactions import TransactionStore +from synapse.replication.slave.storage.transactions import SlavedTransactionStore from synapse.replication.tcp.client import ReplicationClientHandler from synapse.rest.client.v1.room import ( JoinedRoomMemberListRestServlet, @@ -66,7 +66,7 @@ class ClientReaderSlavedStore( DirectoryStore, SlavedApplicationServiceStore, SlavedRegistrationStore, - TransactionStore, + SlavedTransactionStore, SlavedClientIpStore, BaseSlavedStore, ): @@ -168,11 +168,13 @@ def start(config_options): database_engine = create_engine(config.database_config) tls_server_context_factory = context_factory.ServerContextFactory(config) + tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config) ss = ClientReaderServer( config.server_name, db_config=config.database_config, tls_server_context_factory=tls_server_context_factory, + tls_client_options_factory=tls_client_options_factory, config=config, version_string="Synapse/" + get_version_string(synapse), database_engine=database_engine, diff --git a/synapse/app/event_creator.py b/synapse/app/event_creator.py index 374f115644..03d39968a8 100644 --- a/synapse/app/event_creator.py +++ b/synapse/app/event_creator.py @@ -43,7 +43,7 @@ from synapse.replication.slave.storage.pushers import SlavedPusherStore from synapse.replication.slave.storage.receipts import SlavedReceiptsStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore from synapse.replication.slave.storage.room import RoomStore -from synapse.replication.slave.storage.transactions import TransactionStore +from synapse.replication.slave.storage.transactions import SlavedTransactionStore from synapse.replication.tcp.client import ReplicationClientHandler from synapse.rest.client.v1.room import ( JoinRoomAliasServlet, @@ -63,7 +63,7 @@ logger = logging.getLogger("synapse.app.event_creator") class EventCreatorSlavedStore( DirectoryStore, - TransactionStore, + SlavedTransactionStore, SlavedProfileStore, SlavedAccountDataStore, SlavedPusherStore, @@ -174,11 +174,13 @@ def start(config_options): database_engine = create_engine(config.database_config) tls_server_context_factory = context_factory.ServerContextFactory(config) + tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config) ss = EventCreatorServer( config.server_name, db_config=config.database_config, tls_server_context_factory=tls_server_context_factory, + tls_client_options_factory=tls_client_options_factory, config=config, version_string="Synapse/" + get_version_string(synapse), database_engine=database_engine, diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py index 7af00b8bcf..7d8105778d 100644 --- a/synapse/app/federation_reader.py +++ b/synapse/app/federation_reader.py @@ -32,11 +32,17 @@ from synapse.http.site import SynapseSite from synapse.metrics import RegistryProxy from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.replication.slave.storage._base import BaseSlavedStore +from synapse.replication.slave.storage.account_data import SlavedAccountDataStore +from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore from synapse.replication.slave.storage.directory import DirectoryStore from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.keys import SlavedKeyStore +from synapse.replication.slave.storage.profile import SlavedProfileStore +from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore +from synapse.replication.slave.storage.pushers import SlavedPusherStore +from synapse.replication.slave.storage.receipts import SlavedReceiptsStore from synapse.replication.slave.storage.room import RoomStore -from synapse.replication.slave.storage.transactions import TransactionStore +from synapse.replication.slave.storage.transactions import SlavedTransactionStore from synapse.replication.tcp.client import ReplicationClientHandler from synapse.server import HomeServer from synapse.storage.engines import create_engine @@ -49,11 +55,17 @@ logger = logging.getLogger("synapse.app.federation_reader") class FederationReaderSlavedStore( + SlavedAccountDataStore, + SlavedProfileStore, + SlavedApplicationServiceStore, + SlavedPusherStore, + SlavedPushRuleStore, + SlavedReceiptsStore, SlavedEventStore, SlavedKeyStore, RoomStore, DirectoryStore, - TransactionStore, + SlavedTransactionStore, BaseSlavedStore, ): pass @@ -143,11 +155,13 @@ def start(config_options): database_engine = create_engine(config.database_config) tls_server_context_factory = context_factory.ServerContextFactory(config) + tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config) ss = FederationReaderServer( config.server_name, db_config=config.database_config, tls_server_context_factory=tls_server_context_factory, + tls_client_options_factory=tls_client_options_factory, config=config, version_string="Synapse/" + get_version_string(synapse), database_engine=database_engine, diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index 18469013fa..d59007099b 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -36,11 +36,11 @@ from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.presence import SlavedPresenceStore from synapse.replication.slave.storage.receipts import SlavedReceiptsStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore -from synapse.replication.slave.storage.transactions import TransactionStore +from synapse.replication.slave.storage.transactions import SlavedTransactionStore from synapse.replication.tcp.client import ReplicationClientHandler from synapse.server import HomeServer from synapse.storage.engines import create_engine -from synapse.util.async import Linearizer +from synapse.util.async_helpers import Linearizer from synapse.util.httpresourcetree import create_resource_tree from synapse.util.logcontext import LoggingContext, run_in_background from synapse.util.manhole import manhole @@ -50,7 +50,7 @@ logger = logging.getLogger("synapse.app.federation_sender") class FederationSenderSlaveStore( - SlavedDeviceInboxStore, TransactionStore, SlavedReceiptsStore, SlavedEventStore, + SlavedDeviceInboxStore, SlavedTransactionStore, SlavedReceiptsStore, SlavedEventStore, SlavedRegistrationStore, SlavedDeviceStore, SlavedPresenceStore, ): def __init__(self, db_conn, hs): @@ -144,8 +144,9 @@ class FederationSenderReplicationHandler(ReplicationClientHandler): super(FederationSenderReplicationHandler, self).__init__(hs.get_datastore()) self.send_handler = FederationSenderHandler(hs, self) + @defer.inlineCallbacks def on_rdata(self, stream_name, token, rows): - super(FederationSenderReplicationHandler, self).on_rdata( + yield super(FederationSenderReplicationHandler, self).on_rdata( stream_name, token, rows ) self.send_handler.process_replication_rows(stream_name, token, rows) @@ -186,11 +187,13 @@ def start(config_options): config.send_federation = True tls_server_context_factory = context_factory.ServerContextFactory(config) + tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config) ps = FederationSenderServer( config.server_name, db_config=config.database_config, tls_server_context_factory=tls_server_context_factory, + tls_client_options_factory=tls_client_options_factory, config=config, version_string="Synapse/" + get_version_string(synapse), database_engine=database_engine, diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py index b5f78f4640..8d484c1cd4 100644 --- a/synapse/app/frontend_proxy.py +++ b/synapse/app/frontend_proxy.py @@ -38,6 +38,7 @@ from synapse.replication.slave.storage.client_ips import SlavedClientIpStore from synapse.replication.slave.storage.devices import SlavedDeviceStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore from synapse.replication.tcp.client import ReplicationClientHandler +from synapse.rest.client.v1.base import ClientV1RestServlet, client_path_patterns from synapse.rest.client.v2_alpha._base import client_v2_patterns from synapse.server import HomeServer from synapse.storage.engines import create_engine @@ -49,6 +50,35 @@ from synapse.util.versionstring import get_version_string logger = logging.getLogger("synapse.app.frontend_proxy") +class PresenceStatusStubServlet(ClientV1RestServlet): + PATTERNS = client_path_patterns("/presence/(?P<user_id>[^/]*)/status") + + def __init__(self, hs): + super(PresenceStatusStubServlet, self).__init__(hs) + self.http_client = hs.get_simple_http_client() + self.auth = hs.get_auth() + self.main_uri = hs.config.worker_main_http_uri + + @defer.inlineCallbacks + def on_GET(self, request, user_id): + # Pass through the auth headers, if any, in case the access token + # is there. + auth_headers = request.requestHeaders.getRawHeaders("Authorization", []) + headers = { + "Authorization": auth_headers, + } + result = yield self.http_client.get_json( + self.main_uri + request.uri, + headers=headers, + ) + defer.returnValue((200, result)) + + @defer.inlineCallbacks + def on_PUT(self, request, user_id): + yield self.auth.get_user_by_req(request) + defer.returnValue((200, {})) + + class KeyUploadServlet(RestServlet): PATTERNS = client_v2_patterns("/keys/upload(/(?P<device_id>[^/]+))?$") @@ -135,6 +165,12 @@ class FrontendProxyServer(HomeServer): elif name == "client": resource = JsonResource(self, canonical_json=False) KeyUploadServlet(self).register(resource) + + # If presence is disabled, use the stub servlet that does + # not allow sending presence + if not self.config.use_presence: + PresenceStatusStubServlet(self).register(resource) + resources.update({ "/_matrix/client/r0": resource, "/_matrix/client/unstable": resource, @@ -153,7 +189,8 @@ class FrontendProxyServer(HomeServer): listener_config, root_resource, self.version_string, - ) + ), + reactor=self.get_reactor() ) logger.info("Synapse client reader now listening on port %d", port) @@ -208,11 +245,13 @@ def start(config_options): database_engine = create_engine(config.database_config) tls_server_context_factory = context_factory.ServerContextFactory(config) + tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config) ss = FrontendProxyServer( config.server_name, db_config=config.database_config, tls_server_context_factory=tls_server_context_factory, + tls_client_options_factory=tls_client_options_factory, config=config, version_string="Synapse/" + get_version_string(synapse), database_engine=database_engine, diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index fba51c26e8..005921dcf7 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -303,8 +303,8 @@ class SynapseHomeServer(HomeServer): # Gauges to expose monthly active user control metrics -current_mau_gauge = Gauge("synapse_admin_current_mau", "Current MAU") -max_mau_value_gauge = Gauge("synapse_admin_max_mau_value", "MAU Limit") +current_mau_gauge = Gauge("synapse_admin_mau:current", "Current MAU") +max_mau_gauge = Gauge("synapse_admin_mau:max", "MAU Limit") def setup(config_options): @@ -338,6 +338,7 @@ def setup(config_options): events.USE_FROZEN_DICTS = config.use_frozen_dicts tls_server_context_factory = context_factory.ServerContextFactory(config) + tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config) database_engine = create_engine(config.database_config) config.database_config["args"]["cp_openfun"] = database_engine.on_new_connection @@ -346,6 +347,7 @@ def setup(config_options): config.server_name, db_config=config.database_config, tls_server_context_factory=tls_server_context_factory, + tls_client_options_factory=tls_client_options_factory, config=config, version_string="Synapse/" + get_version_string(synapse), database_engine=database_engine, @@ -519,17 +521,27 @@ def run(hs): # table will decrease clock.looping_call(generate_user_daily_visit_stats, 5 * 60 * 1000) + # monthly active user limiting functionality + clock.looping_call( + hs.get_datastore().reap_monthly_active_users, 1000 * 60 * 60 + ) + hs.get_datastore().reap_monthly_active_users() + @defer.inlineCallbacks def generate_monthly_active_users(): count = 0 if hs.config.limit_usage_by_mau: - count = yield hs.get_datastore().count_monthly_users() + count = yield hs.get_datastore().get_monthly_active_count() current_mau_gauge.set(float(count)) - max_mau_value_gauge.set(float(hs.config.max_mau_value)) + max_mau_gauge.set(float(hs.config.max_mau_value)) + hs.get_datastore().initialise_reserved_users( + hs.config.mau_limits_reserved_threepids + ) generate_monthly_active_users() if hs.config.limit_usage_by_mau: clock.looping_call(generate_monthly_active_users, 5 * 60 * 1000) + # End of monthly active user settings if hs.config.report_stats: logger.info("Scheduling stats reporting for 3 hour intervals") diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py index 749bbf37d0..fd1f6cbf7e 100644 --- a/synapse/app/media_repository.py +++ b/synapse/app/media_repository.py @@ -34,7 +34,7 @@ from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore from synapse.replication.slave.storage.client_ips import SlavedClientIpStore from synapse.replication.slave.storage.registration import SlavedRegistrationStore -from synapse.replication.slave.storage.transactions import TransactionStore +from synapse.replication.slave.storage.transactions import SlavedTransactionStore from synapse.replication.tcp.client import ReplicationClientHandler from synapse.rest.media.v0.content_repository import ContentRepoResource from synapse.server import HomeServer @@ -52,7 +52,7 @@ class MediaRepositorySlavedStore( SlavedApplicationServiceStore, SlavedRegistrationStore, SlavedClientIpStore, - TransactionStore, + SlavedTransactionStore, BaseSlavedStore, MediaRepositoryStore, ): @@ -155,11 +155,13 @@ def start(config_options): database_engine = create_engine(config.database_config) tls_server_context_factory = context_factory.ServerContextFactory(config) + tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config) ss = MediaRepositoryServer( config.server_name, db_config=config.database_config, tls_server_context_factory=tls_server_context_factory, + tls_client_options_factory=tls_client_options_factory, config=config, version_string="Synapse/" + get_version_string(synapse), database_engine=database_engine, diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index 9295a51d5b..a4fc7e91fa 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -148,8 +148,9 @@ class PusherReplicationHandler(ReplicationClientHandler): self.pusher_pool = hs.get_pusherpool() + @defer.inlineCallbacks def on_rdata(self, stream_name, token, rows): - super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows) + yield super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows) run_in_background(self.poke_pushers, stream_name, token, rows) @defer.inlineCallbacks @@ -162,11 +163,11 @@ class PusherReplicationHandler(ReplicationClientHandler): else: yield self.start_pusher(row.user_id, row.app_id, row.pushkey) elif stream_name == "events": - yield self.pusher_pool.on_new_notifications( + self.pusher_pool.on_new_notifications( token, token, ) elif stream_name == "receipts": - yield self.pusher_pool.on_new_receipts( + self.pusher_pool.on_new_receipts( token, token, set(row.room_id for row in rows) ) except Exception: diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index e201f18efd..27e1998660 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -114,7 +114,10 @@ class SynchrotronPresence(object): logger.info("Presence process_id is %r", self.process_id) def send_user_sync(self, user_id, is_syncing, last_sync_ms): - self.hs.get_tcp_replication().send_user_sync(user_id, is_syncing, last_sync_ms) + if self.hs.config.use_presence: + self.hs.get_tcp_replication().send_user_sync( + user_id, is_syncing, last_sync_ms + ) def mark_as_coming_online(self, user_id): """A user has started syncing. Send a UserSync to the master, unless they @@ -211,10 +214,13 @@ class SynchrotronPresence(object): yield self.notify_from_replication(states, stream_id) def get_currently_syncing_users(self): - return [ - user_id for user_id, count in iteritems(self.user_to_num_current_syncs) - if count > 0 - ] + if self.hs.config.use_presence: + return [ + user_id for user_id, count in iteritems(self.user_to_num_current_syncs) + if count > 0 + ] + else: + return set() class SynchrotronTyping(object): @@ -332,8 +338,9 @@ class SyncReplicationHandler(ReplicationClientHandler): self.presence_handler = hs.get_presence_handler() self.notifier = hs.get_notifier() + @defer.inlineCallbacks def on_rdata(self, stream_name, token, rows): - super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows) + yield super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows) run_in_background(self.process_and_notify, stream_name, token, rows) def get_streams_to_replicate(self): diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py index 637a89530a..1388a42b59 100644 --- a/synapse/app/user_dir.py +++ b/synapse/app/user_dir.py @@ -169,8 +169,9 @@ class UserDirectoryReplicationHandler(ReplicationClientHandler): super(UserDirectoryReplicationHandler, self).__init__(hs.get_datastore()) self.user_directory = hs.get_user_directory_handler() + @defer.inlineCallbacks def on_rdata(self, stream_name, token, rows): - super(UserDirectoryReplicationHandler, self).on_rdata( + yield super(UserDirectoryReplicationHandler, self).on_rdata( stream_name, token, rows ) if stream_name == "current_state_deltas": @@ -214,11 +215,13 @@ def start(config_options): config.update_user_directory = True tls_server_context_factory = context_factory.ServerContextFactory(config) + tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config) ps = UserDirectoryServer( config.server_name, db_config=config.database_config, tls_server_context_factory=tls_server_context_factory, + tls_client_options_factory=tls_client_options_factory, config=config, version_string="Synapse/" + get_version_string(synapse), database_engine=database_engine, |