diff --git a/synapse/__init__.py b/synapse/__init__.py
index cf22fabd61..f26e49da36 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -35,4 +35,4 @@ try:
except ImportError:
pass
-__version__ = "1.1.0"
+__version__ = "1.2.0rc1"
diff --git a/synapse/config/workers.py b/synapse/config/workers.py
index 246d72cd61..bc0fc165e3 100644
--- a/synapse/config/workers.py
+++ b/synapse/config/workers.py
@@ -31,6 +31,7 @@ class WorkerConfig(Config):
self.worker_listeners = config.get("worker_listeners", [])
self.worker_daemonize = config.get("worker_daemonize")
self.worker_pid_file = config.get("worker_pid_file")
+ self.worker_log_config = config.get("worker_log_config")
# The host used to connect to the main synapse
self.worker_replication_host = config.get("worker_replication_host", None)
diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py
index 341c863152..e8bb420ad1 100644
--- a/synapse/crypto/keyring.py
+++ b/synapse/crypto/keyring.py
@@ -238,27 +238,9 @@ class Keyring(object):
"""
try:
- # create a deferred for each server we're going to look up the keys
- # for; we'll resolve them once we have completed our lookups.
- # These will be passed into wait_for_previous_lookups to block
- # any other lookups until we have finished.
- # The deferreds are called with no logcontext.
- server_to_deferred = {
- rq.server_name: defer.Deferred() for rq in verify_requests
- }
-
- # We want to wait for any previous lookups to complete before
- # proceeding.
- yield self.wait_for_previous_lookups(server_to_deferred)
+ ctx = LoggingContext.current_context()
- # Actually start fetching keys.
- self._get_server_verify_keys(verify_requests)
-
- # When we've finished fetching all the keys for a given server_name,
- # resolve the deferred passed to `wait_for_previous_lookups` so that
- # any lookups waiting will proceed.
- #
- # map from server name to a set of request ids
+ # map from server name to a set of outstanding request ids
server_to_request_ids = {}
for verify_request in verify_requests:
@@ -266,40 +248,61 @@ class Keyring(object):
request_id = id(verify_request)
server_to_request_ids.setdefault(server_name, set()).add(request_id)
- def remove_deferreds(res, verify_request):
+ # Wait for any previous lookups to complete before proceeding.
+ yield self.wait_for_previous_lookups(server_to_request_ids.keys())
+
+ # take out a lock on each of the servers by sticking a Deferred in
+ # key_downloads
+ for server_name in server_to_request_ids.keys():
+ self.key_downloads[server_name] = defer.Deferred()
+ logger.debug("Got key lookup lock on %s", server_name)
+
+ # When we've finished fetching all the keys for a given server_name,
+ # drop the lock by resolving the deferred in key_downloads.
+ def drop_server_lock(server_name):
+ d = self.key_downloads.pop(server_name)
+ d.callback(None)
+
+ def lookup_done(res, verify_request):
server_name = verify_request.server_name
- request_id = id(verify_request)
- server_to_request_ids[server_name].discard(request_id)
- if not server_to_request_ids[server_name]:
- d = server_to_deferred.pop(server_name, None)
- if d:
- d.callback(None)
+ server_requests = server_to_request_ids[server_name]
+ server_requests.remove(id(verify_request))
+
+ # if there are no more requests for this server, we can drop the lock.
+ if not server_requests:
+ with PreserveLoggingContext(ctx):
+ logger.debug("Releasing key lookup lock on %s", server_name)
+
+ # ... but not immediately, as that can cause stack explosions if
+ # we get a long queue of lookups.
+ self.clock.call_later(0, drop_server_lock, server_name)
+
return res
for verify_request in verify_requests:
- verify_request.key_ready.addBoth(remove_deferreds, verify_request)
+ verify_request.key_ready.addBoth(lookup_done, verify_request)
+
+ # Actually start fetching keys.
+ self._get_server_verify_keys(verify_requests)
except Exception:
logger.exception("Error starting key lookups")
@defer.inlineCallbacks
- def wait_for_previous_lookups(self, server_to_deferred):
+ def wait_for_previous_lookups(self, server_names):
"""Waits for any previous key lookups for the given servers to finish.
Args:
- server_to_deferred (dict[str, Deferred]): server_name to deferred which gets
- resolved once we've finished looking up keys for that server.
- The Deferreds should be regular twisted ones which call their
- callbacks with no logcontext.
-
- Returns: a Deferred which resolves once all key lookups for the given
- servers have completed. Follows the synapse rules of logcontext
- preservation.
+ server_names (Iterable[str]): list of servers which we want to look up
+
+ Returns:
+ Deferred[None]: resolves once all key lookups for the given servers have
+ completed. Follows the synapse rules of logcontext preservation.
"""
loop_count = 1
while True:
wait_on = [
(server_name, self.key_downloads[server_name])
- for server_name in server_to_deferred.keys()
+ for server_name in server_names
if server_name in self.key_downloads
]
if not wait_on:
@@ -314,19 +317,6 @@ class Keyring(object):
loop_count += 1
- ctx = LoggingContext.current_context()
-
- def rm(r, server_name_):
- with PreserveLoggingContext(ctx):
- logger.debug("Releasing key lookup lock on %s", server_name_)
- self.key_downloads.pop(server_name_, None)
- return r
-
- for server_name, deferred in server_to_deferred.items():
- logger.debug("Got key lookup lock on %s", server_name)
- self.key_downloads[server_name] = deferred
- deferred.addBoth(rm, server_name)
-
def _get_server_verify_keys(self, verify_requests):
"""Tries to find at least one key for each verify request
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 6b0ca80087..86a333a919 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -469,7 +469,7 @@ class DataStore(
return self._simple_select_list(
table="users",
keyvalues={},
- retcols=["name", "password_hash", "is_guest", "admin"],
+ retcols=["name", "password_hash", "is_guest", "admin", "user_type"],
desc="get_users",
)
@@ -494,7 +494,7 @@ class DataStore(
orderby=order,
start=start,
limit=limit,
- retcols=["name", "password_hash", "is_guest", "admin"],
+ retcols=["name", "password_hash", "is_guest", "admin", "user_type"],
)
count = yield self.runInteraction("get_users_paginate", self.get_user_count_txn)
retval = {"users": users, "total": count}
@@ -514,7 +514,7 @@ class DataStore(
table="users",
term=term,
col="name",
- retcols=["name", "password_hash", "is_guest", "admin"],
+ retcols=["name", "password_hash", "is_guest", "admin", "user_type"],
desc="search_users",
)
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 257bcdb2f8..b3c002b9eb 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -852,22 +852,25 @@ class RoomMemberStore(RoomMemberWorkerStore):
@defer.inlineCallbacks
def _background_current_state_membership(self, progress, batch_size):
"""Update the new membership column on current_state_events.
+
+ This works by iterating over all rooms in alphebetical order.
"""
- if "rooms" not in progress:
- rooms = yield self._simple_select_onecol(
- table="current_state_events",
- keyvalues={},
- retcol="DISTINCT room_id",
- desc="_background_current_state_membership_get_rooms",
- )
- progress["rooms"] = rooms
+ def _background_current_state_membership_txn(txn, last_processed_room):
+ processed = 0
+ while processed < batch_size:
+ txn.execute(
+ """
+ SELECT MIN(room_id) FROM rooms WHERE room_id > ?
+ """,
+ (last_processed_room,),
+ )
+ row = txn.fetchone()
+ if not row or not row[0]:
+ return processed, True
- rooms = progress["rooms"]
+ next_room, = row
- def _background_current_state_membership_txn(txn):
- processed = 0
- while rooms and processed < batch_size:
sql = """
UPDATE current_state_events AS c
SET membership = (
@@ -876,24 +879,33 @@ class RoomMemberStore(RoomMemberWorkerStore):
)
WHERE room_id = ?
"""
- txn.execute(sql, (rooms.pop(),))
+ txn.execute(sql, (next_room,))
processed += txn.rowcount
+ last_processed_room = next_room
+
self._background_update_progress_txn(
- txn, _CURRENT_STATE_MEMBERSHIP_UPDATE_NAME, progress
+ txn,
+ _CURRENT_STATE_MEMBERSHIP_UPDATE_NAME,
+ {"last_processed_room": last_processed_room},
)
- return processed
+ return processed, False
- result = yield self.runInteraction(
+ # If we haven't got a last processed room then just use the empty
+ # string, which will compare before all room IDs correctly.
+ last_processed_room = progress.get("last_processed_room", "")
+
+ row_count, finished = yield self.runInteraction(
"_background_current_state_membership_update",
_background_current_state_membership_txn,
+ last_processed_room,
)
- if not rooms:
+ if finished:
yield self._end_background_update(_CURRENT_STATE_MEMBERSHIP_UPDATE_NAME)
- defer.returnValue(result)
+ defer.returnValue(row_count)
class _JoinedHostsCache(object):
diff --git a/synapse/util/versionstring.py b/synapse/util/versionstring.py
index a4d9a462f7..fa404b9d75 100644
--- a/synapse/util/versionstring.py
+++ b/synapse/util/versionstring.py
@@ -22,6 +22,23 @@ logger = logging.getLogger(__name__)
def get_version_string(module):
+ """Given a module calculate a git-aware version string for it.
+
+ If called on a module not in a git checkout will return `__verison__`.
+
+ Args:
+ module (module)
+
+ Returns:
+ str
+ """
+
+ cached_version = getattr(module, "_synapse_version_string_cache", None)
+ if cached_version:
+ return cached_version
+
+ version_string = module.__version__
+
try:
null = open(os.devnull, "w")
cwd = os.path.dirname(os.path.abspath(module.__file__))
@@ -80,8 +97,10 @@ def get_version_string(module):
s for s in (git_branch, git_tag, git_commit, git_dirty) if s
)
- return "%s (%s)" % (module.__version__, git_version)
+ version_string = "%s (%s)" % (module.__version__, git_version)
except Exception as e:
logger.info("Failed to check for git repository: %s", e)
- return module.__version__
+ module._synapse_version_string_cache = version_string
+
+ return version_string
|