summary refs log tree commit diff
path: root/synapse/app
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2018-08-20 14:49:43 +0100
committerErik Johnston <erik@matrix.org>2018-08-20 14:49:43 +0100
commit4d664278afc044e0d6118de4e22bccbf5464402c (patch)
tree58a6364007ecfbdb6f51f997bbd86f4e538dd8ef /synapse/app
parentRemove redundant room_version checks (diff)
parentMerge pull request #3719 from matrix-org/erikj/use_cache_fact (diff)
downloadsynapse-4d664278afc044e0d6118de4e22bccbf5464402c.tar.xz
Merge branch 'develop' of github.com:matrix-org/synapse into erikj/refactor_state_handler
Diffstat (limited to 'synapse/app')
-rw-r--r--synapse/app/_base.py6
-rw-r--r--synapse/app/appservice.py3
-rw-r--r--synapse/app/client_reader.py6
-rw-r--r--synapse/app/event_creator.py6
-rw-r--r--synapse/app/federation_reader.py18
-rw-r--r--synapse/app/federation_sender.py11
-rw-r--r--synapse/app/frontend_proxy.py41
-rwxr-xr-xsynapse/app/homeserver.py9
-rw-r--r--synapse/app/media_repository.py6
-rw-r--r--synapse/app/pusher.py7
-rw-r--r--synapse/app/synchrotron.py19
-rw-r--r--synapse/app/user_dir.py5
12 files changed, 108 insertions, 29 deletions
diff --git a/synapse/app/_base.py b/synapse/app/_base.py
index 391bd14c5c..7c866e246a 100644
--- a/synapse/app/_base.py
+++ b/synapse/app/_base.py
@@ -140,7 +140,7 @@ def listen_metrics(bind_addresses, port):
         logger.info("Metrics now reporting on %s:%d", host, port)
 
 
-def listen_tcp(bind_addresses, port, factory, backlog=50):
+def listen_tcp(bind_addresses, port, factory, reactor=reactor, backlog=50):
     """
     Create a TCP socket for a port and several addresses
     """
@@ -156,7 +156,9 @@ def listen_tcp(bind_addresses, port, factory, backlog=50):
             check_bind_error(e, address, bind_addresses)
 
 
-def listen_ssl(bind_addresses, port, factory, context_factory, backlog=50):
+def listen_ssl(
+    bind_addresses, port, factory, context_factory, reactor=reactor, backlog=50
+):
     """
     Create an SSL socket for a port and several addresses
     """
diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py
index 9a37384fb7..3348a8ec6d 100644
--- a/synapse/app/appservice.py
+++ b/synapse/app/appservice.py
@@ -117,8 +117,9 @@ class ASReplicationHandler(ReplicationClientHandler):
         super(ASReplicationHandler, self).__init__(hs.get_datastore())
         self.appservice_handler = hs.get_application_service_handler()
 
+    @defer.inlineCallbacks
     def on_rdata(self, stream_name, token, rows):
-        super(ASReplicationHandler, self).on_rdata(stream_name, token, rows)
+        yield super(ASReplicationHandler, self).on_rdata(stream_name, token, rows)
 
         if stream_name == "events":
             max_stream_id = self.store.get_room_max_stream_ordering()
diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py
index e2c91123db..ab79a45646 100644
--- a/synapse/app/client_reader.py
+++ b/synapse/app/client_reader.py
@@ -39,7 +39,7 @@ from synapse.replication.slave.storage.events import SlavedEventStore
 from synapse.replication.slave.storage.keys import SlavedKeyStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
 from synapse.replication.slave.storage.room import RoomStore
-from synapse.replication.slave.storage.transactions import TransactionStore
+from synapse.replication.slave.storage.transactions import SlavedTransactionStore
 from synapse.replication.tcp.client import ReplicationClientHandler
 from synapse.rest.client.v1.room import (
     JoinedRoomMemberListRestServlet,
@@ -66,7 +66,7 @@ class ClientReaderSlavedStore(
     DirectoryStore,
     SlavedApplicationServiceStore,
     SlavedRegistrationStore,
-    TransactionStore,
+    SlavedTransactionStore,
     SlavedClientIpStore,
     BaseSlavedStore,
 ):
@@ -168,11 +168,13 @@ def start(config_options):
     database_engine = create_engine(config.database_config)
 
     tls_server_context_factory = context_factory.ServerContextFactory(config)
+    tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config)
 
     ss = ClientReaderServer(
         config.server_name,
         db_config=config.database_config,
         tls_server_context_factory=tls_server_context_factory,
+        tls_client_options_factory=tls_client_options_factory,
         config=config,
         version_string="Synapse/" + get_version_string(synapse),
         database_engine=database_engine,
diff --git a/synapse/app/event_creator.py b/synapse/app/event_creator.py
index 374f115644..03d39968a8 100644
--- a/synapse/app/event_creator.py
+++ b/synapse/app/event_creator.py
@@ -43,7 +43,7 @@ from synapse.replication.slave.storage.pushers import SlavedPusherStore
 from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
 from synapse.replication.slave.storage.room import RoomStore
-from synapse.replication.slave.storage.transactions import TransactionStore
+from synapse.replication.slave.storage.transactions import SlavedTransactionStore
 from synapse.replication.tcp.client import ReplicationClientHandler
 from synapse.rest.client.v1.room import (
     JoinRoomAliasServlet,
@@ -63,7 +63,7 @@ logger = logging.getLogger("synapse.app.event_creator")
 
 class EventCreatorSlavedStore(
     DirectoryStore,
-    TransactionStore,
+    SlavedTransactionStore,
     SlavedProfileStore,
     SlavedAccountDataStore,
     SlavedPusherStore,
@@ -174,11 +174,13 @@ def start(config_options):
     database_engine = create_engine(config.database_config)
 
     tls_server_context_factory = context_factory.ServerContextFactory(config)
+    tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config)
 
     ss = EventCreatorServer(
         config.server_name,
         db_config=config.database_config,
         tls_server_context_factory=tls_server_context_factory,
+        tls_client_options_factory=tls_client_options_factory,
         config=config,
         version_string="Synapse/" + get_version_string(synapse),
         database_engine=database_engine,
diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py
index 7af00b8bcf..7d8105778d 100644
--- a/synapse/app/federation_reader.py
+++ b/synapse/app/federation_reader.py
@@ -32,11 +32,17 @@ from synapse.http.site import SynapseSite
 from synapse.metrics import RegistryProxy
 from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
 from synapse.replication.slave.storage._base import BaseSlavedStore
+from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
+from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
 from synapse.replication.slave.storage.directory import DirectoryStore
 from synapse.replication.slave.storage.events import SlavedEventStore
 from synapse.replication.slave.storage.keys import SlavedKeyStore
+from synapse.replication.slave.storage.profile import SlavedProfileStore
+from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
+from synapse.replication.slave.storage.pushers import SlavedPusherStore
+from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
 from synapse.replication.slave.storage.room import RoomStore
-from synapse.replication.slave.storage.transactions import TransactionStore
+from synapse.replication.slave.storage.transactions import SlavedTransactionStore
 from synapse.replication.tcp.client import ReplicationClientHandler
 from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
@@ -49,11 +55,17 @@ logger = logging.getLogger("synapse.app.federation_reader")
 
 
 class FederationReaderSlavedStore(
+    SlavedAccountDataStore,
+    SlavedProfileStore,
+    SlavedApplicationServiceStore,
+    SlavedPusherStore,
+    SlavedPushRuleStore,
+    SlavedReceiptsStore,
     SlavedEventStore,
     SlavedKeyStore,
     RoomStore,
     DirectoryStore,
-    TransactionStore,
+    SlavedTransactionStore,
     BaseSlavedStore,
 ):
     pass
@@ -143,11 +155,13 @@ def start(config_options):
     database_engine = create_engine(config.database_config)
 
     tls_server_context_factory = context_factory.ServerContextFactory(config)
+    tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config)
 
     ss = FederationReaderServer(
         config.server_name,
         db_config=config.database_config,
         tls_server_context_factory=tls_server_context_factory,
+        tls_client_options_factory=tls_client_options_factory,
         config=config,
         version_string="Synapse/" + get_version_string(synapse),
         database_engine=database_engine,
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index 18469013fa..d59007099b 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -36,11 +36,11 @@ from synapse.replication.slave.storage.events import SlavedEventStore
 from synapse.replication.slave.storage.presence import SlavedPresenceStore
 from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
-from synapse.replication.slave.storage.transactions import TransactionStore
+from synapse.replication.slave.storage.transactions import SlavedTransactionStore
 from synapse.replication.tcp.client import ReplicationClientHandler
 from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
-from synapse.util.async import Linearizer
+from synapse.util.async_helpers import Linearizer
 from synapse.util.httpresourcetree import create_resource_tree
 from synapse.util.logcontext import LoggingContext, run_in_background
 from synapse.util.manhole import manhole
@@ -50,7 +50,7 @@ logger = logging.getLogger("synapse.app.federation_sender")
 
 
 class FederationSenderSlaveStore(
-    SlavedDeviceInboxStore, TransactionStore, SlavedReceiptsStore, SlavedEventStore,
+    SlavedDeviceInboxStore, SlavedTransactionStore, SlavedReceiptsStore, SlavedEventStore,
     SlavedRegistrationStore, SlavedDeviceStore, SlavedPresenceStore,
 ):
     def __init__(self, db_conn, hs):
@@ -144,8 +144,9 @@ class FederationSenderReplicationHandler(ReplicationClientHandler):
         super(FederationSenderReplicationHandler, self).__init__(hs.get_datastore())
         self.send_handler = FederationSenderHandler(hs, self)
 
+    @defer.inlineCallbacks
     def on_rdata(self, stream_name, token, rows):
-        super(FederationSenderReplicationHandler, self).on_rdata(
+        yield super(FederationSenderReplicationHandler, self).on_rdata(
             stream_name, token, rows
         )
         self.send_handler.process_replication_rows(stream_name, token, rows)
@@ -186,11 +187,13 @@ def start(config_options):
     config.send_federation = True
 
     tls_server_context_factory = context_factory.ServerContextFactory(config)
+    tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config)
 
     ps = FederationSenderServer(
         config.server_name,
         db_config=config.database_config,
         tls_server_context_factory=tls_server_context_factory,
+        tls_client_options_factory=tls_client_options_factory,
         config=config,
         version_string="Synapse/" + get_version_string(synapse),
         database_engine=database_engine,
diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py
index b5f78f4640..8d484c1cd4 100644
--- a/synapse/app/frontend_proxy.py
+++ b/synapse/app/frontend_proxy.py
@@ -38,6 +38,7 @@ from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
 from synapse.replication.slave.storage.devices import SlavedDeviceStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
 from synapse.replication.tcp.client import ReplicationClientHandler
+from synapse.rest.client.v1.base import ClientV1RestServlet, client_path_patterns
 from synapse.rest.client.v2_alpha._base import client_v2_patterns
 from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
@@ -49,6 +50,35 @@ from synapse.util.versionstring import get_version_string
 logger = logging.getLogger("synapse.app.frontend_proxy")
 
 
+class PresenceStatusStubServlet(ClientV1RestServlet):
+    PATTERNS = client_path_patterns("/presence/(?P<user_id>[^/]*)/status")
+
+    def __init__(self, hs):
+        super(PresenceStatusStubServlet, self).__init__(hs)
+        self.http_client = hs.get_simple_http_client()
+        self.auth = hs.get_auth()
+        self.main_uri = hs.config.worker_main_http_uri
+
+    @defer.inlineCallbacks
+    def on_GET(self, request, user_id):
+        # Pass through the auth headers, if any, in case the access token
+        # is there.
+        auth_headers = request.requestHeaders.getRawHeaders("Authorization", [])
+        headers = {
+            "Authorization": auth_headers,
+        }
+        result = yield self.http_client.get_json(
+            self.main_uri + request.uri,
+            headers=headers,
+        )
+        defer.returnValue((200, result))
+
+    @defer.inlineCallbacks
+    def on_PUT(self, request, user_id):
+        yield self.auth.get_user_by_req(request)
+        defer.returnValue((200, {}))
+
+
 class KeyUploadServlet(RestServlet):
     PATTERNS = client_v2_patterns("/keys/upload(/(?P<device_id>[^/]+))?$")
 
@@ -135,6 +165,12 @@ class FrontendProxyServer(HomeServer):
                 elif name == "client":
                     resource = JsonResource(self, canonical_json=False)
                     KeyUploadServlet(self).register(resource)
+
+                    # If presence is disabled, use the stub servlet that does
+                    # not allow sending presence
+                    if not self.config.use_presence:
+                        PresenceStatusStubServlet(self).register(resource)
+
                     resources.update({
                         "/_matrix/client/r0": resource,
                         "/_matrix/client/unstable": resource,
@@ -153,7 +189,8 @@ class FrontendProxyServer(HomeServer):
                 listener_config,
                 root_resource,
                 self.version_string,
-            )
+            ),
+            reactor=self.get_reactor()
         )
 
         logger.info("Synapse client reader now listening on port %d", port)
@@ -208,11 +245,13 @@ def start(config_options):
     database_engine = create_engine(config.database_config)
 
     tls_server_context_factory = context_factory.ServerContextFactory(config)
+    tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config)
 
     ss = FrontendProxyServer(
         config.server_name,
         db_config=config.database_config,
         tls_server_context_factory=tls_server_context_factory,
+        tls_client_options_factory=tls_client_options_factory,
         config=config,
         version_string="Synapse/" + get_version_string(synapse),
         database_engine=database_engine,
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index a4a65e7286..005921dcf7 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -303,8 +303,8 @@ class SynapseHomeServer(HomeServer):
 
 
 # Gauges to expose monthly active user control metrics
-current_mau_gauge = Gauge("synapse_admin_current_mau", "Current MAU")
-max_mau_value_gauge = Gauge("synapse_admin_max_mau_value", "MAU Limit")
+current_mau_gauge = Gauge("synapse_admin_mau:current", "Current MAU")
+max_mau_gauge = Gauge("synapse_admin_mau:max", "MAU Limit")
 
 
 def setup(config_options):
@@ -338,6 +338,7 @@ def setup(config_options):
     events.USE_FROZEN_DICTS = config.use_frozen_dicts
 
     tls_server_context_factory = context_factory.ServerContextFactory(config)
+    tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config)
 
     database_engine = create_engine(config.database_config)
     config.database_config["args"]["cp_openfun"] = database_engine.on_new_connection
@@ -346,6 +347,7 @@ def setup(config_options):
         config.server_name,
         db_config=config.database_config,
         tls_server_context_factory=tls_server_context_factory,
+        tls_client_options_factory=tls_client_options_factory,
         config=config,
         version_string="Synapse/" + get_version_string(synapse),
         database_engine=database_engine,
@@ -523,6 +525,7 @@ def run(hs):
     clock.looping_call(
         hs.get_datastore().reap_monthly_active_users, 1000 * 60 * 60
     )
+    hs.get_datastore().reap_monthly_active_users()
 
     @defer.inlineCallbacks
     def generate_monthly_active_users():
@@ -530,7 +533,7 @@ def run(hs):
         if hs.config.limit_usage_by_mau:
             count = yield hs.get_datastore().get_monthly_active_count()
         current_mau_gauge.set(float(count))
-        max_mau_value_gauge.set(float(hs.config.max_mau_value))
+        max_mau_gauge.set(float(hs.config.max_mau_value))
 
     hs.get_datastore().initialise_reserved_users(
         hs.config.mau_limits_reserved_threepids
diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py
index 749bbf37d0..fd1f6cbf7e 100644
--- a/synapse/app/media_repository.py
+++ b/synapse/app/media_repository.py
@@ -34,7 +34,7 @@ from synapse.replication.slave.storage._base import BaseSlavedStore
 from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
 from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
-from synapse.replication.slave.storage.transactions import TransactionStore
+from synapse.replication.slave.storage.transactions import SlavedTransactionStore
 from synapse.replication.tcp.client import ReplicationClientHandler
 from synapse.rest.media.v0.content_repository import ContentRepoResource
 from synapse.server import HomeServer
@@ -52,7 +52,7 @@ class MediaRepositorySlavedStore(
     SlavedApplicationServiceStore,
     SlavedRegistrationStore,
     SlavedClientIpStore,
-    TransactionStore,
+    SlavedTransactionStore,
     BaseSlavedStore,
     MediaRepositoryStore,
 ):
@@ -155,11 +155,13 @@ def start(config_options):
     database_engine = create_engine(config.database_config)
 
     tls_server_context_factory = context_factory.ServerContextFactory(config)
+    tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config)
 
     ss = MediaRepositoryServer(
         config.server_name,
         db_config=config.database_config,
         tls_server_context_factory=tls_server_context_factory,
+        tls_client_options_factory=tls_client_options_factory,
         config=config,
         version_string="Synapse/" + get_version_string(synapse),
         database_engine=database_engine,
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index 9295a51d5b..a4fc7e91fa 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -148,8 +148,9 @@ class PusherReplicationHandler(ReplicationClientHandler):
 
         self.pusher_pool = hs.get_pusherpool()
 
+    @defer.inlineCallbacks
     def on_rdata(self, stream_name, token, rows):
-        super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows)
+        yield super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows)
         run_in_background(self.poke_pushers, stream_name, token, rows)
 
     @defer.inlineCallbacks
@@ -162,11 +163,11 @@ class PusherReplicationHandler(ReplicationClientHandler):
                     else:
                         yield self.start_pusher(row.user_id, row.app_id, row.pushkey)
             elif stream_name == "events":
-                yield self.pusher_pool.on_new_notifications(
+                self.pusher_pool.on_new_notifications(
                     token, token,
                 )
             elif stream_name == "receipts":
-                yield self.pusher_pool.on_new_receipts(
+                self.pusher_pool.on_new_receipts(
                     token, token, set(row.room_id for row in rows)
                 )
         except Exception:
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index e201f18efd..27e1998660 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -114,7 +114,10 @@ class SynchrotronPresence(object):
         logger.info("Presence process_id is %r", self.process_id)
 
     def send_user_sync(self, user_id, is_syncing, last_sync_ms):
-        self.hs.get_tcp_replication().send_user_sync(user_id, is_syncing, last_sync_ms)
+        if self.hs.config.use_presence:
+            self.hs.get_tcp_replication().send_user_sync(
+                user_id, is_syncing, last_sync_ms
+            )
 
     def mark_as_coming_online(self, user_id):
         """A user has started syncing. Send a UserSync to the master, unless they
@@ -211,10 +214,13 @@ class SynchrotronPresence(object):
         yield self.notify_from_replication(states, stream_id)
 
     def get_currently_syncing_users(self):
-        return [
-            user_id for user_id, count in iteritems(self.user_to_num_current_syncs)
-            if count > 0
-        ]
+        if self.hs.config.use_presence:
+            return [
+                user_id for user_id, count in iteritems(self.user_to_num_current_syncs)
+                if count > 0
+            ]
+        else:
+            return set()
 
 
 class SynchrotronTyping(object):
@@ -332,8 +338,9 @@ class SyncReplicationHandler(ReplicationClientHandler):
         self.presence_handler = hs.get_presence_handler()
         self.notifier = hs.get_notifier()
 
+    @defer.inlineCallbacks
     def on_rdata(self, stream_name, token, rows):
-        super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows)
+        yield super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows)
         run_in_background(self.process_and_notify, stream_name, token, rows)
 
     def get_streams_to_replicate(self):
diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py
index 637a89530a..1388a42b59 100644
--- a/synapse/app/user_dir.py
+++ b/synapse/app/user_dir.py
@@ -169,8 +169,9 @@ class UserDirectoryReplicationHandler(ReplicationClientHandler):
         super(UserDirectoryReplicationHandler, self).__init__(hs.get_datastore())
         self.user_directory = hs.get_user_directory_handler()
 
+    @defer.inlineCallbacks
     def on_rdata(self, stream_name, token, rows):
-        super(UserDirectoryReplicationHandler, self).on_rdata(
+        yield super(UserDirectoryReplicationHandler, self).on_rdata(
             stream_name, token, rows
         )
         if stream_name == "current_state_deltas":
@@ -214,11 +215,13 @@ def start(config_options):
     config.update_user_directory = True
 
     tls_server_context_factory = context_factory.ServerContextFactory(config)
+    tls_client_options_factory = context_factory.ClientTLSOptionsFactory(config)
 
     ps = UserDirectoryServer(
         config.server_name,
         db_config=config.database_config,
         tls_server_context_factory=tls_server_context_factory,
+        tls_client_options_factory=tls_client_options_factory,
         config=config,
         version_string="Synapse/" + get_version_string(synapse),
         database_engine=database_engine,