summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rwxr-xr-xsynapse/app/homeserver.py22
-rw-r--r--synapse/app/media_repository.py10
-rw-r--r--synapse/app/synchrotron.py3
-rw-r--r--synapse/config/server.py6
-rw-r--r--synapse/http/endpoint.py6
-rw-r--r--synapse/http/server.py12
-rw-r--r--synapse/replication/tcp/resource.py6
-rw-r--r--synapse/rest/media/v1/preview_url_resource.py6
-rw-r--r--synapse/server.py42
-rw-r--r--synapse/server.pyi7
-rw-r--r--synapse/storage/_base.py7
-rw-r--r--synapse/storage/account_data.py85
12 files changed, 138 insertions, 74 deletions
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 9e26146338..6b8875afb4 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -43,7 +43,6 @@ from synapse.rest import ClientRestResource
 from synapse.rest.key.v1.server_key_resource import LocalKey
 from synapse.rest.key.v2 import KeyApiV2Resource
 from synapse.rest.media.v0.content_repository import ContentRepoResource
-from synapse.rest.media.v1.media_repository import MediaRepositoryResource
 from synapse.server import HomeServer
 from synapse.storage import are_all_users_on_domain
 from synapse.storage.engines import IncorrectDatabaseSetup, create_engine
@@ -195,14 +194,19 @@ class SynapseHomeServer(HomeServer):
             })
 
         if name in ["media", "federation", "client"]:
-            media_repo = MediaRepositoryResource(self)
-            resources.update({
-                MEDIA_PREFIX: media_repo,
-                LEGACY_MEDIA_PREFIX: media_repo,
-                CONTENT_REPO_PREFIX: ContentRepoResource(
-                    self, self.config.uploads_path
-                ),
-            })
+            if self.get_config().enable_media_repo:
+                media_repo = self.get_media_repository_resource()
+                resources.update({
+                    MEDIA_PREFIX: media_repo,
+                    LEGACY_MEDIA_PREFIX: media_repo,
+                    CONTENT_REPO_PREFIX: ContentRepoResource(
+                        self, self.config.uploads_path
+                    ),
+                })
+            elif name == "media":
+                raise ConfigError(
+                    "'media' resource conflicts with enable_media_repo=False",
+                )
 
         if name in ["keys", "federation"]:
             resources.update({
diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py
index 36c18bdbcb..c4e5f0965d 100644
--- a/synapse/app/media_repository.py
+++ b/synapse/app/media_repository.py
@@ -35,7 +35,6 @@ from synapse.replication.slave.storage.registration import SlavedRegistrationSto
 from synapse.replication.slave.storage.transactions import TransactionStore
 from synapse.replication.tcp.client import ReplicationClientHandler
 from synapse.rest.media.v0.content_repository import ContentRepoResource
-from synapse.rest.media.v1.media_repository import MediaRepositoryResource
 from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
 from synapse.storage.media_repository import MediaRepositoryStore
@@ -89,7 +88,7 @@ class MediaRepositoryServer(HomeServer):
                 if name == "metrics":
                     resources[METRICS_PREFIX] = MetricsResource(self)
                 elif name == "media":
-                    media_repo = MediaRepositoryResource(self)
+                    media_repo = self.get_media_repository_resource()
                     resources.update({
                         MEDIA_PREFIX: media_repo,
                         LEGACY_MEDIA_PREFIX: media_repo,
@@ -151,6 +150,13 @@ def start(config_options):
 
     assert config.worker_app == "synapse.app.media_repository"
 
+    if config.enable_media_repo:
+        _base.quit_with_error(
+            "enable_media_repo must be disabled in the main synapse process\n"
+            "before the media repo can be run in a separate worker.\n"
+            "Please add ``enable_media_repo: false`` to the main config\n"
+        )
+
     setup_logging(config, use_worker_options=True)
 
     events.USE_FROZEN_DICTS = config.use_frozen_dicts
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 576ac6fb7e..323fddee21 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -340,11 +340,10 @@ class SyncReplicationHandler(ReplicationClientHandler):
 
         self.store = hs.get_datastore()
         self.typing_handler = hs.get_typing_handler()
+        # NB this is a SynchrotronPresence, not a normal PresenceHandler
         self.presence_handler = hs.get_presence_handler()
         self.notifier = hs.get_notifier()
 
-        self.presence_handler.sync_callback = self.send_user_sync
-
     def on_rdata(self, stream_name, token, rows):
         super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows)
 
diff --git a/synapse/config/server.py b/synapse/config/server.py
index 4d9193536d..edb90a1348 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -41,6 +41,12 @@ class ServerConfig(Config):
         # false only if we are updating the user directory in a worker
         self.update_user_directory = config.get("update_user_directory", True)
 
+        # whether to enable the media repository endpoints. This should be set
+        # to false if the media repository is running as a separate endpoint;
+        # doing so ensures that we will not run cache cleanup jobs on the
+        # master, potentially causing inconsistency.
+        self.enable_media_repo = config.get("enable_media_repo", True)
+
         self.filter_timeline_limit = config.get("filter_timeline_limit", -1)
 
         # Whether we should block invites sent to users on this server
diff --git a/synapse/http/endpoint.py b/synapse/http/endpoint.py
index a97532162f..e2b99ef3bd 100644
--- a/synapse/http/endpoint.py
+++ b/synapse/http/endpoint.py
@@ -362,8 +362,10 @@ def _get_hosts_for_srv_record(dns_client, host):
         return res
 
     # no logcontexts here, so we can safely fire these off and gatherResults
-    d1 = dns_client.lookupAddress(host).addCallbacks(cb, eb)
-    d2 = dns_client.lookupIPV6Address(host).addCallbacks(cb, eb)
+    d1 = dns_client.lookupAddress(host).addCallbacks(
+        cb, eb, errbackArgs=("A", ))
+    d2 = dns_client.lookupIPV6Address(host).addCallbacks(
+        cb, eb, errbackArgs=("AAAA", ))
     results = yield defer.DeferredList(
         [d1, d2], consumeErrors=True)
 
diff --git a/synapse/http/server.py b/synapse/http/server.py
index 3ca1c9947c..25466cd292 100644
--- a/synapse/http/server.py
+++ b/synapse/http/server.py
@@ -28,6 +28,7 @@ from canonicaljson import (
 )
 
 from twisted.internet import defer
+from twisted.python import failure
 from twisted.web import server, resource
 from twisted.web.server import NOT_DONE_YET
 from twisted.web.util import redirectTo
@@ -131,12 +132,17 @@ def wrap_request_handler(request_handler, include_metrics=False):
                             version_string=self.version_string,
                         )
                     except Exception:
-                        logger.exception(
-                            "Failed handle request %s.%s on %r: %r",
+                        # failure.Failure() fishes the original Failure out
+                        # of our stack, and thus gives us a sensible stack
+                        # trace.
+                        f = failure.Failure()
+                        logger.error(
+                            "Failed handle request %s.%s on %r: %r: %s",
                             request_handler.__module__,
                             request_handler.__name__,
                             self,
-                            request
+                            request,
+                            f.getTraceback().rstrip(),
                         )
                         respond_with_json(
                             request,
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 1d03e79b85..786c3fe864 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -216,11 +216,12 @@ class ReplicationStreamer(object):
             self.federation_sender.federation_ack(token)
 
     @measure_func("repl.on_user_sync")
+    @defer.inlineCallbacks
     def on_user_sync(self, conn_id, user_id, is_syncing, last_sync_ms):
         """A client has started/stopped syncing on a worker.
         """
         user_sync_counter.inc()
-        self.presence_handler.update_external_syncs_row(
+        yield self.presence_handler.update_external_syncs_row(
             conn_id, user_id, is_syncing, last_sync_ms,
         )
 
@@ -244,11 +245,12 @@ class ReplicationStreamer(object):
         getattr(self.store, cache_func).invalidate(tuple(keys))
 
     @measure_func("repl.on_user_ip")
+    @defer.inlineCallbacks
     def on_user_ip(self, user_id, access_token, ip, user_agent, device_id, last_seen):
         """The client saw a user request
         """
         user_ip_cache_counter.inc()
-        self.store.insert_client_ip(
+        yield self.store.insert_client_ip(
             user_id, access_token, ip, user_agent, device_id, last_seen,
         )
 
diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py
index 385e4079ec..40d2e664eb 100644
--- a/synapse/rest/media/v1/preview_url_resource.py
+++ b/synapse/rest/media/v1/preview_url_resource.py
@@ -25,7 +25,8 @@ from synapse.util.stringutils import random_string
 from synapse.util.caches.expiringcache import ExpiringCache
 from synapse.http.client import SpiderHttpClient
 from synapse.http.server import (
-    request_handler, respond_with_json_bytes
+    request_handler, respond_with_json_bytes,
+    respond_with_json,
 )
 from synapse.util.async import ObservableDeferred
 from synapse.util.stringutils import is_ascii
@@ -78,6 +79,9 @@ class PreviewUrlResource(Resource):
             self._expire_url_cache_data, 10 * 1000
         )
 
+    def render_OPTIONS(self, request):
+        return respond_with_json(request, 200, {}, send_cors=True)
+
     def render_GET(self, request):
         self._async_render_GET(request)
         return NOT_DONE_YET
diff --git a/synapse/server.py b/synapse/server.py
index 10e3e9a4f1..853f4647b7 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -60,7 +60,10 @@ from synapse.http.matrixfederationclient import MatrixFederationHttpClient
 from synapse.notifier import Notifier
 from synapse.push.action_generator import ActionGenerator
 from synapse.push.pusherpool import PusherPool
-from synapse.rest.media.v1.media_repository import MediaRepository
+from synapse.rest.media.v1.media_repository import (
+    MediaRepository,
+    MediaRepositoryResource,
+)
 from synapse.state import StateHandler
 from synapse.storage import DataStore
 from synapse.streams.events import EventSources
@@ -90,17 +93,12 @@ class HomeServer(object):
     """
 
     DEPENDENCIES = [
-        'config',
-        'clock',
         'http_client',
         'db_pool',
-        'persistence_service',
         'replication_layer',
-        'datastore',
         'handlers',
         'v1auth',
         'auth',
-        'rest_servlet_factory',
         'state_handler',
         'presence_handler',
         'sync_handler',
@@ -118,18 +116,7 @@ class HomeServer(object):
         'device_message_handler',
         'profile_handler',
         'notifier',
-        'distributor',
-        'client_resource',
-        'resource_for_federation',
-        'resource_for_static_content',
-        'resource_for_web_client',
-        'resource_for_content_repo',
-        'resource_for_server_key',
-        'resource_for_server_key_v2',
-        'resource_for_media_repository',
-        'resource_for_metrics',
         'event_sources',
-        'ratelimiter',
         'keyring',
         'pusherpool',
         'event_builder_factory',
@@ -137,6 +124,7 @@ class HomeServer(object):
         'http_client_context_factory',
         'simple_http_client',
         'media_repository',
+        'media_repository_resource',
         'federation_transport_client',
         'federation_sender',
         'receipts_handler',
@@ -183,6 +171,21 @@ class HomeServer(object):
     def is_mine_id(self, string):
         return string.split(":", 1)[1] == self.hostname
 
+    def get_clock(self):
+        return self.clock
+
+    def get_datastore(self):
+        return self.datastore
+
+    def get_config(self):
+        return self.config
+
+    def get_distributor(self):
+        return self.distributor
+
+    def get_ratelimiter(self):
+        return self.ratelimiter
+
     def build_replication_layer(self):
         return initialize_http_replication(self)
 
@@ -294,6 +297,11 @@ class HomeServer(object):
             **self.db_config.get("args", {})
         )
 
+    def build_media_repository_resource(self):
+        # build the media repo resource. This indirects through the HomeServer
+        # to ensure that we only have a single instance of
+        return MediaRepositoryResource(self)
+
     def build_media_repository(self):
         return MediaRepository(self)
 
diff --git a/synapse/server.pyi b/synapse/server.pyi
index e8c0386b7f..3064a497eb 100644
--- a/synapse/server.pyi
+++ b/synapse/server.pyi
@@ -5,6 +5,7 @@ import synapse.handlers
 import synapse.handlers.auth
 import synapse.handlers.device
 import synapse.handlers.e2e_keys
+import synapse.rest.media.v1.media_repository
 import synapse.storage
 import synapse.state
 
@@ -35,3 +36,9 @@ class HomeServer(object):
 
     def get_federation_transport_client(self) -> synapse.federation.transport.client.TransportLayerClient:
         pass
+
+    def get_media_repository_resource(self) -> synapse.rest.media.v1.media_repository.MediaRepositoryResource:
+        pass
+
+    def get_media_repository(self) -> synapse.rest.media.v1.media_repository.MediaRepository:
+        pass
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 470f7881ab..20fa25895d 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -495,6 +495,7 @@ class SQLBaseStore(object):
             Deferred(bool): True if a new entry was created, False if an
                 existing one was updated.
         """
+        attempts = 0
         while True:
             try:
                 result = yield self.runInteraction(
@@ -504,6 +505,12 @@ class SQLBaseStore(object):
                 )
                 defer.returnValue(result)
             except self.database_engine.module.IntegrityError as e:
+                attempts += 1
+                if attempts >= 5:
+                    # don't retry forever, because things other than races
+                    # can cause IntegrityErrors
+                    raise
+
                 # presumably we raced with another transaction: let's retry.
                 logger.warn(
                     "IntegrityError when upserting into %s; retrying: %s",
diff --git a/synapse/storage/account_data.py b/synapse/storage/account_data.py
index c8a1eb016b..56a0bde549 100644
--- a/synapse/storage/account_data.py
+++ b/synapse/storage/account_data.py
@@ -222,9 +222,12 @@ class AccountDataStore(SQLBaseStore):
         """
         content_json = json.dumps(content)
 
-        def add_account_data_txn(txn, next_id):
-            self._simple_upsert_txn(
-                txn,
+        with self._account_data_id_gen.get_next() as next_id:
+            # no need to lock here as room_account_data has a unique constraint
+            # on (user_id, room_id, account_data_type) so _simple_upsert will
+            # retry if there is a conflict.
+            yield self._simple_upsert(
+                desc="add_room_account_data",
                 table="room_account_data",
                 keyvalues={
                     "user_id": user_id,
@@ -234,19 +237,20 @@ class AccountDataStore(SQLBaseStore):
                 values={
                     "stream_id": next_id,
                     "content": content_json,
-                }
-            )
-            txn.call_after(
-                self._account_data_stream_cache.entity_has_changed,
-                user_id, next_id,
+                },
+                lock=False,
             )
-            txn.call_after(self.get_account_data_for_user.invalidate, (user_id,))
-            self._update_max_stream_id(txn, next_id)
 
-        with self._account_data_id_gen.get_next() as next_id:
-            yield self.runInteraction(
-                "add_room_account_data", add_account_data_txn, next_id
-            )
+            # it's theoretically possible for the above to succeed and the
+            # below to fail - in which case we might reuse a stream id on
+            # restart, and the above update might not get propagated. That
+            # doesn't sound any worse than the whole update getting lost,
+            # which is what would happen if we combined the two into one
+            # transaction.
+            yield self._update_max_stream_id(next_id)
+
+            self._account_data_stream_cache.entity_has_changed(user_id, next_id)
+            self.get_account_data_for_user.invalidate((user_id,))
 
         result = self._account_data_id_gen.get_current_token()
         defer.returnValue(result)
@@ -263,9 +267,12 @@ class AccountDataStore(SQLBaseStore):
         """
         content_json = json.dumps(content)
 
-        def add_account_data_txn(txn, next_id):
-            self._simple_upsert_txn(
-                txn,
+        with self._account_data_id_gen.get_next() as next_id:
+            # no need to lock here as account_data has a unique constraint on
+            # (user_id, account_data_type) so _simple_upsert will retry if
+            # there is a conflict.
+            yield self._simple_upsert(
+                desc="add_user_account_data",
                 table="account_data",
                 keyvalues={
                     "user_id": user_id,
@@ -274,40 +281,46 @@ class AccountDataStore(SQLBaseStore):
                 values={
                     "stream_id": next_id,
                     "content": content_json,
-                }
+                },
+                lock=False,
             )
-            txn.call_after(
-                self._account_data_stream_cache.entity_has_changed,
+
+            # it's theoretically possible for the above to succeed and the
+            # below to fail - in which case we might reuse a stream id on
+            # restart, and the above update might not get propagated. That
+            # doesn't sound any worse than the whole update getting lost,
+            # which is what would happen if we combined the two into one
+            # transaction.
+            yield self._update_max_stream_id(next_id)
+
+            self._account_data_stream_cache.entity_has_changed(
                 user_id, next_id,
             )
-            txn.call_after(self.get_account_data_for_user.invalidate, (user_id,))
-            txn.call_after(
-                self.get_global_account_data_by_type_for_user.invalidate,
+            self.get_account_data_for_user.invalidate((user_id,))
+            self.get_global_account_data_by_type_for_user.invalidate(
                 (account_data_type, user_id,)
             )
-            self._update_max_stream_id(txn, next_id)
-
-        with self._account_data_id_gen.get_next() as next_id:
-            yield self.runInteraction(
-                "add_user_account_data", add_account_data_txn, next_id
-            )
 
         result = self._account_data_id_gen.get_current_token()
         defer.returnValue(result)
 
-    def _update_max_stream_id(self, txn, next_id):
+    def _update_max_stream_id(self, next_id):
         """Update the max stream_id
 
         Args:
-            txn: The database cursor
             next_id(int): The the revision to advance to.
         """
-        update_max_id_sql = (
-            "UPDATE account_data_max_stream_id"
-            " SET stream_id = ?"
-            " WHERE stream_id < ?"
+        def _update(txn):
+            update_max_id_sql = (
+                "UPDATE account_data_max_stream_id"
+                " SET stream_id = ?"
+                " WHERE stream_id < ?"
+            )
+            txn.execute(update_max_id_sql, (next_id, next_id))
+        return self.runInteraction(
+            "update_account_data_max_stream_id",
+            _update,
         )
-        txn.execute(update_max_id_sql, (next_id, next_id))
 
     @cachedInlineCallbacks(num_args=2, cache_context=True, max_entries=5000)
     def is_ignored_by(self, ignored_user_id, ignorer_user_id, cache_context):