summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/__init__.py2
-rw-r--r--synapse/config/workers.py1
-rw-r--r--synapse/crypto/keyring.py92
-rw-r--r--synapse/storage/__init__.py6
-rw-r--r--synapse/storage/roommember.py48
-rw-r--r--synapse/util/versionstring.py23
6 files changed, 97 insertions, 75 deletions
diff --git a/synapse/__init__.py b/synapse/__init__.py
index cf22fabd61..f26e49da36 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -35,4 +35,4 @@ try:
 except ImportError:
     pass
 
-__version__ = "1.1.0"
+__version__ = "1.2.0rc1"
diff --git a/synapse/config/workers.py b/synapse/config/workers.py
index 246d72cd61..bc0fc165e3 100644
--- a/synapse/config/workers.py
+++ b/synapse/config/workers.py
@@ -31,6 +31,7 @@ class WorkerConfig(Config):
         self.worker_listeners = config.get("worker_listeners", [])
         self.worker_daemonize = config.get("worker_daemonize")
         self.worker_pid_file = config.get("worker_pid_file")
+        self.worker_log_config = config.get("worker_log_config")
 
         # The host used to connect to the main synapse
         self.worker_replication_host = config.get("worker_replication_host", None)
diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py
index 341c863152..e8bb420ad1 100644
--- a/synapse/crypto/keyring.py
+++ b/synapse/crypto/keyring.py
@@ -238,27 +238,9 @@ class Keyring(object):
         """
 
         try:
-            # create a deferred for each server we're going to look up the keys
-            # for; we'll resolve them once we have completed our lookups.
-            # These will be passed into wait_for_previous_lookups to block
-            # any other lookups until we have finished.
-            # The deferreds are called with no logcontext.
-            server_to_deferred = {
-                rq.server_name: defer.Deferred() for rq in verify_requests
-            }
-
-            # We want to wait for any previous lookups to complete before
-            # proceeding.
-            yield self.wait_for_previous_lookups(server_to_deferred)
+            ctx = LoggingContext.current_context()
 
-            # Actually start fetching keys.
-            self._get_server_verify_keys(verify_requests)
-
-            # When we've finished fetching all the keys for a given server_name,
-            # resolve the deferred passed to `wait_for_previous_lookups` so that
-            # any lookups waiting will proceed.
-            #
-            # map from server name to a set of request ids
+            # map from server name to a set of outstanding request ids
             server_to_request_ids = {}
 
             for verify_request in verify_requests:
@@ -266,40 +248,61 @@ class Keyring(object):
                 request_id = id(verify_request)
                 server_to_request_ids.setdefault(server_name, set()).add(request_id)
 
-            def remove_deferreds(res, verify_request):
+            # Wait for any previous lookups to complete before proceeding.
+            yield self.wait_for_previous_lookups(server_to_request_ids.keys())
+
+            # take out a lock on each of the servers by sticking a Deferred in
+            # key_downloads
+            for server_name in server_to_request_ids.keys():
+                self.key_downloads[server_name] = defer.Deferred()
+                logger.debug("Got key lookup lock on %s", server_name)
+
+            # When we've finished fetching all the keys for a given server_name,
+            # drop the lock by resolving the deferred in key_downloads.
+            def drop_server_lock(server_name):
+                d = self.key_downloads.pop(server_name)
+                d.callback(None)
+
+            def lookup_done(res, verify_request):
                 server_name = verify_request.server_name
-                request_id = id(verify_request)
-                server_to_request_ids[server_name].discard(request_id)
-                if not server_to_request_ids[server_name]:
-                    d = server_to_deferred.pop(server_name, None)
-                    if d:
-                        d.callback(None)
+                server_requests = server_to_request_ids[server_name]
+                server_requests.remove(id(verify_request))
+
+                # if there are no more requests for this server, we can drop the lock.
+                if not server_requests:
+                    with PreserveLoggingContext(ctx):
+                        logger.debug("Releasing key lookup lock on %s", server_name)
+
+                    # ... but not immediately, as that can cause stack explosions if
+                    # we get a long queue of lookups.
+                    self.clock.call_later(0, drop_server_lock, server_name)
+
                 return res
 
             for verify_request in verify_requests:
-                verify_request.key_ready.addBoth(remove_deferreds, verify_request)
+                verify_request.key_ready.addBoth(lookup_done, verify_request)
+
+            # Actually start fetching keys.
+            self._get_server_verify_keys(verify_requests)
         except Exception:
             logger.exception("Error starting key lookups")
 
     @defer.inlineCallbacks
-    def wait_for_previous_lookups(self, server_to_deferred):
+    def wait_for_previous_lookups(self, server_names):
         """Waits for any previous key lookups for the given servers to finish.
 
         Args:
-            server_to_deferred (dict[str, Deferred]): server_name to deferred which gets
-                resolved once we've finished looking up keys for that server.
-                The Deferreds should be regular twisted ones which call their
-                callbacks with no logcontext.
-
-        Returns: a Deferred which resolves once all key lookups for the given
-            servers have completed. Follows the synapse rules of logcontext
-            preservation.
+            server_names (Iterable[str]): list of servers which we want to look up
+
+        Returns:
+            Deferred[None]: resolves once all key lookups for the given servers have
+                completed. Follows the synapse rules of logcontext preservation.
         """
         loop_count = 1
         while True:
             wait_on = [
                 (server_name, self.key_downloads[server_name])
-                for server_name in server_to_deferred.keys()
+                for server_name in server_names
                 if server_name in self.key_downloads
             ]
             if not wait_on:
@@ -314,19 +317,6 @@ class Keyring(object):
 
             loop_count += 1
 
-        ctx = LoggingContext.current_context()
-
-        def rm(r, server_name_):
-            with PreserveLoggingContext(ctx):
-                logger.debug("Releasing key lookup lock on %s", server_name_)
-                self.key_downloads.pop(server_name_, None)
-            return r
-
-        for server_name, deferred in server_to_deferred.items():
-            logger.debug("Got key lookup lock on %s", server_name)
-            self.key_downloads[server_name] = deferred
-            deferred.addBoth(rm, server_name)
-
     def _get_server_verify_keys(self, verify_requests):
         """Tries to find at least one key for each verify request
 
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 6b0ca80087..86a333a919 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -469,7 +469,7 @@ class DataStore(
         return self._simple_select_list(
             table="users",
             keyvalues={},
-            retcols=["name", "password_hash", "is_guest", "admin"],
+            retcols=["name", "password_hash", "is_guest", "admin", "user_type"],
             desc="get_users",
         )
 
@@ -494,7 +494,7 @@ class DataStore(
             orderby=order,
             start=start,
             limit=limit,
-            retcols=["name", "password_hash", "is_guest", "admin"],
+            retcols=["name", "password_hash", "is_guest", "admin", "user_type"],
         )
         count = yield self.runInteraction("get_users_paginate", self.get_user_count_txn)
         retval = {"users": users, "total": count}
@@ -514,7 +514,7 @@ class DataStore(
             table="users",
             term=term,
             col="name",
-            retcols=["name", "password_hash", "is_guest", "admin"],
+            retcols=["name", "password_hash", "is_guest", "admin", "user_type"],
             desc="search_users",
         )
 
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 257bcdb2f8..b3c002b9eb 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -852,22 +852,25 @@ class RoomMemberStore(RoomMemberWorkerStore):
     @defer.inlineCallbacks
     def _background_current_state_membership(self, progress, batch_size):
         """Update the new membership column on current_state_events.
+
+        This works by iterating over all rooms in alphebetical order.
         """
 
-        if "rooms" not in progress:
-            rooms = yield self._simple_select_onecol(
-                table="current_state_events",
-                keyvalues={},
-                retcol="DISTINCT room_id",
-                desc="_background_current_state_membership_get_rooms",
-            )
-            progress["rooms"] = rooms
+        def _background_current_state_membership_txn(txn, last_processed_room):
+            processed = 0
+            while processed < batch_size:
+                txn.execute(
+                    """
+                        SELECT MIN(room_id) FROM rooms WHERE room_id > ?
+                    """,
+                    (last_processed_room,),
+                )
+                row = txn.fetchone()
+                if not row or not row[0]:
+                    return processed, True
 
-        rooms = progress["rooms"]
+                next_room, = row
 
-        def _background_current_state_membership_txn(txn):
-            processed = 0
-            while rooms and processed < batch_size:
                 sql = """
                     UPDATE current_state_events AS c
                     SET membership = (
@@ -876,24 +879,33 @@ class RoomMemberStore(RoomMemberWorkerStore):
                     )
                     WHERE room_id = ?
                 """
-                txn.execute(sql, (rooms.pop(),))
+                txn.execute(sql, (next_room,))
                 processed += txn.rowcount
 
+                last_processed_room = next_room
+
             self._background_update_progress_txn(
-                txn, _CURRENT_STATE_MEMBERSHIP_UPDATE_NAME, progress
+                txn,
+                _CURRENT_STATE_MEMBERSHIP_UPDATE_NAME,
+                {"last_processed_room": last_processed_room},
             )
 
-            return processed
+            return processed, False
 
-        result = yield self.runInteraction(
+        # If we haven't got a last processed room then just use the empty
+        # string, which will compare before all room IDs correctly.
+        last_processed_room = progress.get("last_processed_room", "")
+
+        row_count, finished = yield self.runInteraction(
             "_background_current_state_membership_update",
             _background_current_state_membership_txn,
+            last_processed_room,
         )
 
-        if not rooms:
+        if finished:
             yield self._end_background_update(_CURRENT_STATE_MEMBERSHIP_UPDATE_NAME)
 
-        defer.returnValue(result)
+        defer.returnValue(row_count)
 
 
 class _JoinedHostsCache(object):
diff --git a/synapse/util/versionstring.py b/synapse/util/versionstring.py
index a4d9a462f7..fa404b9d75 100644
--- a/synapse/util/versionstring.py
+++ b/synapse/util/versionstring.py
@@ -22,6 +22,23 @@ logger = logging.getLogger(__name__)
 
 
 def get_version_string(module):
+    """Given a module calculate a git-aware version string for it.
+
+    If called on a module not in a git checkout will return `__verison__`.
+
+    Args:
+        module (module)
+
+    Returns:
+        str
+    """
+
+    cached_version = getattr(module, "_synapse_version_string_cache", None)
+    if cached_version:
+        return cached_version
+
+    version_string = module.__version__
+
     try:
         null = open(os.devnull, "w")
         cwd = os.path.dirname(os.path.abspath(module.__file__))
@@ -80,8 +97,10 @@ def get_version_string(module):
                 s for s in (git_branch, git_tag, git_commit, git_dirty) if s
             )
 
-            return "%s (%s)" % (module.__version__, git_version)
+            version_string = "%s (%s)" % (module.__version__, git_version)
     except Exception as e:
         logger.info("Failed to check for git repository: %s", e)
 
-    return module.__version__
+    module._synapse_version_string_cache = version_string
+
+    return version_string