diff options
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/__init__.py | 2 | ||||
-rw-r--r-- | synapse/config/workers.py | 1 | ||||
-rw-r--r-- | synapse/crypto/keyring.py | 92 | ||||
-rw-r--r-- | synapse/storage/__init__.py | 6 | ||||
-rw-r--r-- | synapse/storage/roommember.py | 48 | ||||
-rw-r--r-- | synapse/util/versionstring.py | 23 |
6 files changed, 97 insertions, 75 deletions
diff --git a/synapse/__init__.py b/synapse/__init__.py index cf22fabd61..f26e49da36 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -35,4 +35,4 @@ try: except ImportError: pass -__version__ = "1.1.0" +__version__ = "1.2.0rc1" diff --git a/synapse/config/workers.py b/synapse/config/workers.py index 246d72cd61..bc0fc165e3 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -31,6 +31,7 @@ class WorkerConfig(Config): self.worker_listeners = config.get("worker_listeners", []) self.worker_daemonize = config.get("worker_daemonize") self.worker_pid_file = config.get("worker_pid_file") + self.worker_log_config = config.get("worker_log_config") # The host used to connect to the main synapse self.worker_replication_host = config.get("worker_replication_host", None) diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index 341c863152..e8bb420ad1 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -238,27 +238,9 @@ class Keyring(object): """ try: - # create a deferred for each server we're going to look up the keys - # for; we'll resolve them once we have completed our lookups. - # These will be passed into wait_for_previous_lookups to block - # any other lookups until we have finished. - # The deferreds are called with no logcontext. - server_to_deferred = { - rq.server_name: defer.Deferred() for rq in verify_requests - } - - # We want to wait for any previous lookups to complete before - # proceeding. - yield self.wait_for_previous_lookups(server_to_deferred) + ctx = LoggingContext.current_context() - # Actually start fetching keys. - self._get_server_verify_keys(verify_requests) - - # When we've finished fetching all the keys for a given server_name, - # resolve the deferred passed to `wait_for_previous_lookups` so that - # any lookups waiting will proceed. - # - # map from server name to a set of request ids + # map from server name to a set of outstanding request ids server_to_request_ids = {} for verify_request in verify_requests: @@ -266,40 +248,61 @@ class Keyring(object): request_id = id(verify_request) server_to_request_ids.setdefault(server_name, set()).add(request_id) - def remove_deferreds(res, verify_request): + # Wait for any previous lookups to complete before proceeding. + yield self.wait_for_previous_lookups(server_to_request_ids.keys()) + + # take out a lock on each of the servers by sticking a Deferred in + # key_downloads + for server_name in server_to_request_ids.keys(): + self.key_downloads[server_name] = defer.Deferred() + logger.debug("Got key lookup lock on %s", server_name) + + # When we've finished fetching all the keys for a given server_name, + # drop the lock by resolving the deferred in key_downloads. + def drop_server_lock(server_name): + d = self.key_downloads.pop(server_name) + d.callback(None) + + def lookup_done(res, verify_request): server_name = verify_request.server_name - request_id = id(verify_request) - server_to_request_ids[server_name].discard(request_id) - if not server_to_request_ids[server_name]: - d = server_to_deferred.pop(server_name, None) - if d: - d.callback(None) + server_requests = server_to_request_ids[server_name] + server_requests.remove(id(verify_request)) + + # if there are no more requests for this server, we can drop the lock. + if not server_requests: + with PreserveLoggingContext(ctx): + logger.debug("Releasing key lookup lock on %s", server_name) + + # ... but not immediately, as that can cause stack explosions if + # we get a long queue of lookups. + self.clock.call_later(0, drop_server_lock, server_name) + return res for verify_request in verify_requests: - verify_request.key_ready.addBoth(remove_deferreds, verify_request) + verify_request.key_ready.addBoth(lookup_done, verify_request) + + # Actually start fetching keys. + self._get_server_verify_keys(verify_requests) except Exception: logger.exception("Error starting key lookups") @defer.inlineCallbacks - def wait_for_previous_lookups(self, server_to_deferred): + def wait_for_previous_lookups(self, server_names): """Waits for any previous key lookups for the given servers to finish. Args: - server_to_deferred (dict[str, Deferred]): server_name to deferred which gets - resolved once we've finished looking up keys for that server. - The Deferreds should be regular twisted ones which call their - callbacks with no logcontext. - - Returns: a Deferred which resolves once all key lookups for the given - servers have completed. Follows the synapse rules of logcontext - preservation. + server_names (Iterable[str]): list of servers which we want to look up + + Returns: + Deferred[None]: resolves once all key lookups for the given servers have + completed. Follows the synapse rules of logcontext preservation. """ loop_count = 1 while True: wait_on = [ (server_name, self.key_downloads[server_name]) - for server_name in server_to_deferred.keys() + for server_name in server_names if server_name in self.key_downloads ] if not wait_on: @@ -314,19 +317,6 @@ class Keyring(object): loop_count += 1 - ctx = LoggingContext.current_context() - - def rm(r, server_name_): - with PreserveLoggingContext(ctx): - logger.debug("Releasing key lookup lock on %s", server_name_) - self.key_downloads.pop(server_name_, None) - return r - - for server_name, deferred in server_to_deferred.items(): - logger.debug("Got key lookup lock on %s", server_name) - self.key_downloads[server_name] = deferred - deferred.addBoth(rm, server_name) - def _get_server_verify_keys(self, verify_requests): """Tries to find at least one key for each verify request diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index 6b0ca80087..86a333a919 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -469,7 +469,7 @@ class DataStore( return self._simple_select_list( table="users", keyvalues={}, - retcols=["name", "password_hash", "is_guest", "admin"], + retcols=["name", "password_hash", "is_guest", "admin", "user_type"], desc="get_users", ) @@ -494,7 +494,7 @@ class DataStore( orderby=order, start=start, limit=limit, - retcols=["name", "password_hash", "is_guest", "admin"], + retcols=["name", "password_hash", "is_guest", "admin", "user_type"], ) count = yield self.runInteraction("get_users_paginate", self.get_user_count_txn) retval = {"users": users, "total": count} @@ -514,7 +514,7 @@ class DataStore( table="users", term=term, col="name", - retcols=["name", "password_hash", "is_guest", "admin"], + retcols=["name", "password_hash", "is_guest", "admin", "user_type"], desc="search_users", ) diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 257bcdb2f8..b3c002b9eb 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -852,22 +852,25 @@ class RoomMemberStore(RoomMemberWorkerStore): @defer.inlineCallbacks def _background_current_state_membership(self, progress, batch_size): """Update the new membership column on current_state_events. + + This works by iterating over all rooms in alphebetical order. """ - if "rooms" not in progress: - rooms = yield self._simple_select_onecol( - table="current_state_events", - keyvalues={}, - retcol="DISTINCT room_id", - desc="_background_current_state_membership_get_rooms", - ) - progress["rooms"] = rooms + def _background_current_state_membership_txn(txn, last_processed_room): + processed = 0 + while processed < batch_size: + txn.execute( + """ + SELECT MIN(room_id) FROM rooms WHERE room_id > ? + """, + (last_processed_room,), + ) + row = txn.fetchone() + if not row or not row[0]: + return processed, True - rooms = progress["rooms"] + next_room, = row - def _background_current_state_membership_txn(txn): - processed = 0 - while rooms and processed < batch_size: sql = """ UPDATE current_state_events AS c SET membership = ( @@ -876,24 +879,33 @@ class RoomMemberStore(RoomMemberWorkerStore): ) WHERE room_id = ? """ - txn.execute(sql, (rooms.pop(),)) + txn.execute(sql, (next_room,)) processed += txn.rowcount + last_processed_room = next_room + self._background_update_progress_txn( - txn, _CURRENT_STATE_MEMBERSHIP_UPDATE_NAME, progress + txn, + _CURRENT_STATE_MEMBERSHIP_UPDATE_NAME, + {"last_processed_room": last_processed_room}, ) - return processed + return processed, False - result = yield self.runInteraction( + # If we haven't got a last processed room then just use the empty + # string, which will compare before all room IDs correctly. + last_processed_room = progress.get("last_processed_room", "") + + row_count, finished = yield self.runInteraction( "_background_current_state_membership_update", _background_current_state_membership_txn, + last_processed_room, ) - if not rooms: + if finished: yield self._end_background_update(_CURRENT_STATE_MEMBERSHIP_UPDATE_NAME) - defer.returnValue(result) + defer.returnValue(row_count) class _JoinedHostsCache(object): diff --git a/synapse/util/versionstring.py b/synapse/util/versionstring.py index a4d9a462f7..fa404b9d75 100644 --- a/synapse/util/versionstring.py +++ b/synapse/util/versionstring.py @@ -22,6 +22,23 @@ logger = logging.getLogger(__name__) def get_version_string(module): + """Given a module calculate a git-aware version string for it. + + If called on a module not in a git checkout will return `__verison__`. + + Args: + module (module) + + Returns: + str + """ + + cached_version = getattr(module, "_synapse_version_string_cache", None) + if cached_version: + return cached_version + + version_string = module.__version__ + try: null = open(os.devnull, "w") cwd = os.path.dirname(os.path.abspath(module.__file__)) @@ -80,8 +97,10 @@ def get_version_string(module): s for s in (git_branch, git_tag, git_commit, git_dirty) if s ) - return "%s (%s)" % (module.__version__, git_version) + version_string = "%s (%s)" % (module.__version__, git_version) except Exception as e: logger.info("Failed to check for git repository: %s", e) - return module.__version__ + module._synapse_version_string_cache = version_string + + return version_string |