diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index bc629832d9..073229b4c4 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -65,8 +65,9 @@ class Auth(object):
@defer.inlineCallbacks
def check_from_context(self, event, context, do_sig_check=True):
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
auth_events_ids = yield self.compute_auth_events(
- event, context.prev_state_ids, for_verification=True,
+ event, prev_state_ids, for_verification=True,
)
auth_events = yield self.store.get_events(auth_events_ids)
auth_events = {
@@ -544,7 +545,8 @@ class Auth(object):
@defer.inlineCallbacks
def add_auth_events(self, builder, context):
- auth_ids = yield self.compute_auth_events(builder, context.prev_state_ids)
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
+ auth_ids = yield self.compute_auth_events(builder, prev_state_ids)
auth_events_entries = yield self.store.add_event_hashes(
auth_ids
@@ -737,3 +739,37 @@ class Auth(object):
)
return query_params[0]
+
+ @defer.inlineCallbacks
+ def check_in_room_or_world_readable(self, room_id, user_id):
+ """Checks that the user is or was in the room or the room is world
+ readable. If it isn't then an exception is raised.
+
+ Returns:
+ Deferred[tuple[str, str|None]]: Resolves to the current membership of
+ the user in the room and the membership event ID of the user. If
+ the user is not in the room and never has been, then
+ `(Membership.JOIN, None)` is returned.
+ """
+
+ try:
+ # check_user_was_in_room will return the most recent membership
+ # event for the user if:
+ # * The user is a non-guest user, and was ever in the room
+ # * The user is a guest user, and has joined the room
+ # else it will throw.
+ member_event = yield self.check_user_was_in_room(room_id, user_id)
+ defer.returnValue((member_event.membership, member_event.event_id))
+ except AuthError:
+ visibility = yield self.state.get_current_state(
+ room_id, EventTypes.RoomHistoryVisibility, ""
+ )
+ if (
+ visibility and
+ visibility.content["history_visibility"] == "world_readable"
+ ):
+ defer.returnValue((Membership.JOIN, None))
+ return
+ raise AuthError(
+ 403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN
+ )
diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py
index b0ea26dcb4..e2c91123db 100644
--- a/synapse/app/client_reader.py
+++ b/synapse/app/client_reader.py
@@ -31,6 +31,7 @@ from synapse.http.site import SynapseSite
from synapse.metrics import RegistryProxy
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage._base import BaseSlavedStore
+from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
from synapse.replication.slave.storage.directory import DirectoryStore
@@ -40,7 +41,13 @@ from synapse.replication.slave.storage.registration import SlavedRegistrationSto
from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.slave.storage.transactions import TransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
-from synapse.rest.client.v1.room import PublicRoomListRestServlet
+from synapse.rest.client.v1.room import (
+ JoinedRoomMemberListRestServlet,
+ PublicRoomListRestServlet,
+ RoomEventContextServlet,
+ RoomMemberListRestServlet,
+ RoomStateRestServlet,
+)
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
from synapse.util.httpresourcetree import create_resource_tree
@@ -52,6 +59,7 @@ logger = logging.getLogger("synapse.app.client_reader")
class ClientReaderSlavedStore(
+ SlavedAccountDataStore,
SlavedEventStore,
SlavedKeyStore,
RoomStore,
@@ -82,7 +90,13 @@ class ClientReaderServer(HomeServer):
resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
elif name == "client":
resource = JsonResource(self, canonical_json=False)
+
PublicRoomListRestServlet(self).register(resource)
+ RoomMemberListRestServlet(self).register(resource)
+ JoinedRoomMemberListRestServlet(self).register(resource)
+ RoomStateRestServlet(self).register(resource)
+ RoomEventContextServlet(self).register(resource)
+
resources.update({
"/_matrix/client/r0": resource,
"/_matrix/client/unstable": resource,
diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py
index b5f78f4640..0d9abf3a24 100644
--- a/synapse/app/frontend_proxy.py
+++ b/synapse/app/frontend_proxy.py
@@ -38,6 +38,7 @@ from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
from synapse.replication.slave.storage.devices import SlavedDeviceStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.tcp.client import ReplicationClientHandler
+from synapse.rest.client.v1.base import ClientV1RestServlet, client_path_patterns
from synapse.rest.client.v2_alpha._base import client_v2_patterns
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
@@ -49,6 +50,35 @@ from synapse.util.versionstring import get_version_string
logger = logging.getLogger("synapse.app.frontend_proxy")
+class PresenceStatusStubServlet(ClientV1RestServlet):
+ PATTERNS = client_path_patterns("/presence/(?P<user_id>[^/]*)/status")
+
+ def __init__(self, hs):
+ super(PresenceStatusStubServlet, self).__init__(hs)
+ self.http_client = hs.get_simple_http_client()
+ self.auth = hs.get_auth()
+ self.main_uri = hs.config.worker_main_http_uri
+
+ @defer.inlineCallbacks
+ def on_GET(self, request, user_id):
+ # Pass through the auth headers, if any, in case the access token
+ # is there.
+ auth_headers = request.requestHeaders.getRawHeaders("Authorization", [])
+ headers = {
+ "Authorization": auth_headers,
+ }
+ result = yield self.http_client.get_json(
+ self.main_uri + request.uri,
+ headers=headers,
+ )
+ defer.returnValue((200, result))
+
+ @defer.inlineCallbacks
+ def on_PUT(self, request, user_id):
+ yield self.auth.get_user_by_req(request)
+ defer.returnValue((200, {}))
+
+
class KeyUploadServlet(RestServlet):
PATTERNS = client_v2_patterns("/keys/upload(/(?P<device_id>[^/]+))?$")
@@ -135,6 +165,7 @@ class FrontendProxyServer(HomeServer):
elif name == "client":
resource = JsonResource(self, canonical_json=False)
KeyUploadServlet(self).register(resource)
+ PresenceStatusStubServlet(self).register(resource)
resources.update({
"/_matrix/client/r0": resource,
"/_matrix/client/unstable": resource,
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 14e6dca522..2ad1beb8d8 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -18,6 +18,8 @@ import logging
import os
import sys
+from six import iteritems
+
from twisted.application import service
from twisted.internet import defer, reactor
from twisted.web.resource import EncodingResourceWrapper, NoResource
@@ -442,7 +444,7 @@ def run(hs):
stats["total_nonbridged_users"] = total_nonbridged_users
daily_user_type_results = yield hs.get_datastore().count_daily_user_type()
- for name, count in daily_user_type_results.iteritems():
+ for name, count in iteritems(daily_user_type_results):
stats["daily_user_type_" + name] = count
room_count = yield hs.get_datastore().get_room_count()
@@ -453,7 +455,7 @@ def run(hs):
stats["daily_messages"] = yield hs.get_datastore().count_daily_messages()
r30_results = yield hs.get_datastore().count_r30_users()
- for name, count in r30_results.iteritems():
+ for name, count in iteritems(r30_results):
stats["r30_users_" + name] = count
daily_sent_messages = yield hs.get_datastore().count_daily_sent_messages()
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 26b9ec85f2..6178080b72 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -55,7 +55,6 @@ from synapse.rest.client.v2_alpha import sync
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
from synapse.storage.presence import UserPresenceState
-from synapse.storage.roommember import RoomMemberStore
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext, run_in_background
from synapse.util.manhole import manhole
@@ -81,9 +80,7 @@ class SynchrotronSlavedStore(
RoomStore,
BaseSlavedStore,
):
- did_forget = (
- RoomMemberStore.__dict__["did_forget"]
- )
+ pass
UPDATE_SYNCING_USERS_MS = 10 * 1000
@@ -117,6 +114,7 @@ class SynchrotronPresence(object):
logger.info("Presence process_id is %r", self.process_id)
def send_user_sync(self, user_id, is_syncing, last_sync_ms):
+ return
self.hs.get_tcp_replication().send_user_sync(user_id, is_syncing, last_sync_ms)
def mark_as_coming_online(self, user_id):
@@ -214,6 +212,8 @@ class SynchrotronPresence(object):
yield self.notify_from_replication(states, stream_id)
def get_currently_syncing_users(self):
+ # presence is disabled on matrix.org, so we return the empty set
+ return set()
return [
user_id for user_id, count in iteritems(self.user_to_num_current_syncs)
if count > 0
diff --git a/synapse/app/synctl.py b/synapse/app/synctl.py
index 68acc15a9a..356e5cb6a7 100755
--- a/synapse/app/synctl.py
+++ b/synapse/app/synctl.py
@@ -25,6 +25,8 @@ import subprocess
import sys
import time
+from six import iteritems
+
import yaml
SYNAPSE = [sys.executable, "-B", "-m", "synapse.app.homeserver"]
@@ -109,7 +111,7 @@ def stop(pidfile, app):
Worker = collections.namedtuple("Worker", [
- "app", "configfile", "pidfile", "cache_factor"
+ "app", "configfile", "pidfile", "cache_factor", "cache_factors",
])
@@ -173,7 +175,7 @@ def main():
os.environ["SYNAPSE_CACHE_FACTOR"] = str(cache_factor)
cache_factors = config.get("synctl_cache_factors", {})
- for cache_name, factor in cache_factors.iteritems():
+ for cache_name, factor in iteritems(cache_factors):
os.environ["SYNAPSE_CACHE_FACTOR_" + cache_name.upper()] = str(factor)
worker_configfiles = []
@@ -216,6 +218,10 @@ def main():
or pidfile
)
worker_cache_factor = worker_config.get("synctl_cache_factor") or cache_factor
+ worker_cache_factors = (
+ worker_config.get("synctl_cache_factors")
+ or cache_factors
+ )
daemonize = worker_config.get("daemonize") or config.get("daemonize")
assert daemonize, "Main process must have daemonize set to true"
@@ -231,8 +237,10 @@ def main():
assert worker_daemonize, "In config %r: expected '%s' to be True" % (
worker_configfile, "worker_daemonize")
worker_cache_factor = worker_config.get("synctl_cache_factor")
+ worker_cache_factors = worker_config.get("synctl_cache_factors", {})
workers.append(Worker(
worker_app, worker_configfile, worker_pidfile, worker_cache_factor,
+ worker_cache_factors,
))
action = options.action
@@ -267,15 +275,19 @@ def main():
start(configfile)
for worker in workers:
+ env = os.environ.copy()
+
if worker.cache_factor:
os.environ["SYNAPSE_CACHE_FACTOR"] = str(worker.cache_factor)
+ for cache_name, factor in worker.cache_factors.iteritems():
+ os.environ["SYNAPSE_CACHE_FACTOR_" + cache_name.upper()] = str(factor)
+
start_worker(worker.app, configfile, worker.configfile)
- if cache_factor:
- os.environ["SYNAPSE_CACHE_FACTOR"] = str(cache_factor)
- else:
- os.environ.pop("SYNAPSE_CACHE_FACTOR", None)
+ # Reset env back to the original
+ os.environ.clear()
+ os.environ.update(env)
if __name__ == "__main__":
diff --git a/synapse/config/voip.py b/synapse/config/voip.py
index 3a4e16fa96..d07bd24ffd 100644
--- a/synapse/config/voip.py
+++ b/synapse/config/voip.py
@@ -30,10 +30,10 @@ class VoipConfig(Config):
## Turn ##
# The public URIs of the TURN server to give to clients
- turn_uris: []
+ #turn_uris: []
# The shared secret used to compute passwords for the TURN server
- turn_shared_secret: "YOUR_SHARED_SECRET"
+ #turn_shared_secret: "YOUR_SHARED_SECRET"
# The Username and password if the TURN server needs them and
# does not use a token
diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py
index bcd9bb5946..368b5f6ae4 100644
--- a/synapse/events/snapshot.py
+++ b/synapse/events/snapshot.py
@@ -13,22 +13,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from six import iteritems
+
from frozendict import frozendict
from twisted.internet import defer
+from synapse.util.logcontext import make_deferred_yieldable, run_in_background
+
class EventContext(object):
"""
Attributes:
- current_state_ids (dict[(str, str), str]):
- The current state map including the current event.
- (type, state_key) -> event_id
-
- prev_state_ids (dict[(str, str), str]):
- The current state map excluding the current event.
- (type, state_key) -> event_id
-
state_group (int|None): state group id, if the state has been stored
as a state group. This is usually only None if e.g. the event is
an outlier.
@@ -45,38 +41,77 @@ class EventContext(object):
prev_state_events (?): XXX: is this ever set to anything other than
the empty list?
+
+ _current_state_ids (dict[(str, str), str]|None):
+ The current state map including the current event. None if outlier
+ or we haven't fetched the state from DB yet.
+ (type, state_key) -> event_id
+
+ _prev_state_ids (dict[(str, str), str]|None):
+ The current state map excluding the current event. None if outlier
+ or we haven't fetched the state from DB yet.
+ (type, state_key) -> event_id
+
+ _fetching_state_deferred (Deferred|None): Resolves when *_state_ids have
+ been calculated. None if we haven't started calculating yet
+
+ _event_type (str): The type of the event the context is associated with.
+ Only set when state has not been fetched yet.
+
+ _event_state_key (str|None): The state_key of the event the context is
+ associated with. Only set when state has not been fetched yet.
+
+ _prev_state_id (str|None): If the event associated with the context is
+ a state event, then `_prev_state_id` is the event_id of the state
+ that was replaced.
+ Only set when state has not been fetched yet.
"""
__slots__ = [
- "current_state_ids",
- "prev_state_ids",
"state_group",
"rejected",
"prev_group",
"delta_ids",
"prev_state_events",
"app_service",
+ "_current_state_ids",
+ "_prev_state_ids",
+ "_prev_state_id",
+ "_event_type",
+ "_event_state_key",
+ "_fetching_state_deferred",
]
def __init__(self):
+ self.prev_state_events = []
+ self.rejected = False
+ self.app_service = None
+
+ @staticmethod
+ def with_state(state_group, current_state_ids, prev_state_ids,
+ prev_group=None, delta_ids=None):
+ context = EventContext()
+
# The current state including the current event
- self.current_state_ids = None
+ context._current_state_ids = current_state_ids
# The current state excluding the current event
- self.prev_state_ids = None
- self.state_group = None
+ context._prev_state_ids = prev_state_ids
+ context.state_group = state_group
- self.rejected = False
+ context._prev_state_id = None
+ context._event_type = None
+ context._event_state_key = None
+ context._fetching_state_deferred = defer.succeed(None)
# A previously persisted state group and a delta between that
# and this state.
- self.prev_group = None
- self.delta_ids = None
+ context.prev_group = prev_group
+ context.delta_ids = delta_ids
- self.prev_state_events = None
-
- self.app_service = None
+ return context
- def serialize(self, event):
+ @defer.inlineCallbacks
+ def serialize(self, event, store):
"""Converts self to a type that can be serialized as JSON, and then
deserialized by `deserialize`
@@ -92,11 +127,12 @@ class EventContext(object):
# the prev_state_ids, so if we're a state event we include the event
# id that we replaced in the state.
if event.is_state():
- prev_state_id = self.prev_state_ids.get((event.type, event.state_key))
+ prev_state_ids = yield self.get_prev_state_ids(store)
+ prev_state_id = prev_state_ids.get((event.type, event.state_key))
else:
prev_state_id = None
- return {
+ defer.returnValue({
"prev_state_id": prev_state_id,
"event_type": event.type,
"event_state_key": event.state_key if event.is_state() else None,
@@ -106,10 +142,9 @@ class EventContext(object):
"delta_ids": _encode_state_dict(self.delta_ids),
"prev_state_events": self.prev_state_events,
"app_service_id": self.app_service.id if self.app_service else None
- }
+ })
@staticmethod
- @defer.inlineCallbacks
def deserialize(store, input):
"""Converts a dict that was produced by `serialize` back into a
EventContext.
@@ -122,32 +157,115 @@ class EventContext(object):
EventContext
"""
context = EventContext()
+
+ # We use the state_group and prev_state_id stuff to pull the
+ # current_state_ids out of the DB and construct prev_state_ids.
+ context._prev_state_id = input["prev_state_id"]
+ context._event_type = input["event_type"]
+ context._event_state_key = input["event_state_key"]
+
+ context._current_state_ids = None
+ context._prev_state_ids = None
+ context._fetching_state_deferred = None
+
context.state_group = input["state_group"]
- context.rejected = input["rejected"]
context.prev_group = input["prev_group"]
context.delta_ids = _decode_state_dict(input["delta_ids"])
+
+ context.rejected = input["rejected"]
context.prev_state_events = input["prev_state_events"]
- # We use the state_group and prev_state_id stuff to pull the
- # current_state_ids out of the DB and construct prev_state_ids.
- prev_state_id = input["prev_state_id"]
- event_type = input["event_type"]
- event_state_key = input["event_state_key"]
+ app_service_id = input["app_service_id"]
+ if app_service_id:
+ context.app_service = store.get_app_service_by_id(app_service_id)
+
+ return context
+
+ @defer.inlineCallbacks
+ def get_current_state_ids(self, store):
+ """Gets the current state IDs
+
+ Returns:
+ Deferred[dict[(str, str), str]|None]: Returns None if state_group
+ is None, which happens when the associated event is an outlier.
+ """
+
+ if not self._fetching_state_deferred:
+ self._fetching_state_deferred = run_in_background(
+ self._fill_out_state, store,
+ )
+
+ yield make_deferred_yieldable(self._fetching_state_deferred)
+
+ defer.returnValue(self._current_state_ids)
+
+ @defer.inlineCallbacks
+ def get_prev_state_ids(self, store):
+ """Gets the prev state IDs
+
+ Returns:
+ Deferred[dict[(str, str), str]|None]: Returns None if state_group
+ is None, which happens when the associated event is an outlier.
+ """
+
+ if not self._fetching_state_deferred:
+ self._fetching_state_deferred = run_in_background(
+ self._fill_out_state, store,
+ )
+
+ yield make_deferred_yieldable(self._fetching_state_deferred)
- context.current_state_ids = yield store.get_state_ids_for_group(
- context.state_group,
+ defer.returnValue(self._prev_state_ids)
+
+ def get_cached_current_state_ids(self):
+ """Gets the current state IDs if we have them already cached.
+
+ Returns:
+ dict[(str, str), str]|None: Returns None if we haven't cached the
+ state or if state_group is None, which happens when the associated
+ event is an outlier.
+ """
+
+ return self._current_state_ids
+
+ @defer.inlineCallbacks
+ def _fill_out_state(self, store):
+ """Called to populate the _current_state_ids and _prev_state_ids
+ attributes by loading from the database.
+ """
+ if self.state_group is None:
+ return
+
+ self._current_state_ids = yield store.get_state_ids_for_group(
+ self.state_group,
)
- if prev_state_id and event_state_key:
- context.prev_state_ids = dict(context.current_state_ids)
- context.prev_state_ids[(event_type, event_state_key)] = prev_state_id
+ if self._prev_state_id and self._event_state_key is not None:
+ self._prev_state_ids = dict(self._current_state_ids)
+
+ key = (self._event_type, self._event_state_key)
+ self._prev_state_ids[key] = self._prev_state_id
else:
- context.prev_state_ids = context.current_state_ids
+ self._prev_state_ids = self._current_state_ids
- app_service_id = input["app_service_id"]
- if app_service_id:
- context.app_service = store.get_app_service_by_id(app_service_id)
+ @defer.inlineCallbacks
+ def update_state(self, state_group, prev_state_ids, current_state_ids,
+ prev_group, delta_ids):
+ """Replace the state in the context
+ """
+
+ # We need to make sure we wait for any ongoing fetching of state
+ # to complete so that the updated state doesn't get clobbered
+ if self._fetching_state_deferred:
+ yield make_deferred_yieldable(self._fetching_state_deferred)
+
+ self.state_group = state_group
+ self._prev_state_ids = prev_state_ids
+ self.prev_group = prev_group
+ self._current_state_ids = current_state_ids
+ self.delta_ids = delta_ids
- defer.returnValue(context)
+ # We need to ensure that that we've marked as having fetched the state
+ self._fetching_state_deferred = defer.succeed(None)
def _encode_state_dict(state_dict):
@@ -159,7 +277,7 @@ def _encode_state_dict(state_dict):
return [
(etype, state_key, v)
- for (etype, state_key), v in state_dict.iteritems()
+ for (etype, state_key), v in iteritems(state_dict)
]
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 10e71c78ce..015ef6334a 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -24,6 +24,7 @@ from prometheus_client import Counter
from twisted.internet import defer
from twisted.internet.abstract import isIPAddress
+from twisted.python import failure
from synapse.api.constants import EventTypes
from synapse.api.errors import AuthError, FederationError, NotFoundError, SynapseError
@@ -186,8 +187,12 @@ class FederationServer(FederationBase):
logger.warn("Error handling PDU %s: %s", event_id, e)
pdu_results[event_id] = {"error": str(e)}
except Exception as e:
+ f = failure.Failure()
pdu_results[event_id] = {"error": str(e)}
- logger.exception("Failed to handle PDU %s", event_id)
+ logger.error(
+ "Failed to handle PDU %s: %s",
+ event_id, f.getTraceback().rstrip(),
+ )
yield async.concurrently_execute(
process_pdus_for_room, pdus_by_room.keys(),
@@ -203,8 +208,8 @@ class FederationServer(FederationBase):
)
pdu_failures = getattr(transaction, "pdu_failures", [])
- for failure in pdu_failures:
- logger.info("Got failure %r", failure)
+ for fail in pdu_failures:
+ logger.info("Got failure %r", fail)
response = {
"pdus": pdu_results,
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 5a956ecfb3..2e5faeb96a 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -30,7 +30,8 @@ from synapse.metrics import (
sent_edus_counter,
sent_transactions_counter,
)
-from synapse.util import PreserveLoggingContext, logcontext
+from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.util import logcontext
from synapse.util.metrics import measure_func
from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
@@ -165,10 +166,11 @@ class TransactionQueue(object):
if self._is_processing:
return
- # fire off a processing loop in the background. It's likely it will
- # outlast the current request, so run it in the sentinel logcontext.
- with PreserveLoggingContext():
- self._process_event_queue_loop()
+ # fire off a processing loop in the background
+ run_as_background_process(
+ "process_event_queue_for_federation",
+ self._process_event_queue_loop,
+ )
@defer.inlineCallbacks
def _process_event_queue_loop(self):
@@ -301,6 +303,7 @@ class TransactionQueue(object):
Args:
states (list(UserPresenceState))
"""
+ return
# First we queue up the new presence by user ID, so multiple presence
# updates in quick successtion are correctly handled
@@ -432,14 +435,11 @@ class TransactionQueue(object):
logger.debug("TX [%s] Starting transaction loop", destination)
- # Drop the logcontext before starting the transaction. It doesn't
- # really make sense to log all the outbound transactions against
- # whatever path led us to this point: that's pretty arbitrary really.
- #
- # (this also means we can fire off _perform_transaction without
- # yielding)
- with logcontext.PreserveLoggingContext():
- self._transaction_transmission_loop(destination)
+ run_as_background_process(
+ "federation_transaction_transmission_loop",
+ self._transaction_transmission_loop,
+ destination,
+ )
@defer.inlineCallbacks
def _transaction_transmission_loop(self, destination):
diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py
index 4b9923d8c0..413425fed1 100644
--- a/synapse/handlers/__init__.py
+++ b/synapse/handlers/__init__.py
@@ -17,9 +17,7 @@ from .admin import AdminHandler
from .directory import DirectoryHandler
from .federation import FederationHandler
from .identity import IdentityHandler
-from .message import MessageHandler
from .register import RegistrationHandler
-from .room import RoomContextHandler
from .search import SearchHandler
@@ -44,10 +42,8 @@ class Handlers(object):
def __init__(self, hs):
self.registration_handler = RegistrationHandler(hs)
- self.message_handler = MessageHandler(hs)
self.federation_handler = FederationHandler(hs)
self.directory_handler = DirectoryHandler(hs)
self.admin_handler = AdminHandler(hs)
self.identity_handler = IdentityHandler(hs)
self.search_handler = SearchHandler(hs)
- self.room_context_handler = RoomContextHandler(hs)
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index b6a8b3aa3b..704181d2d3 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -112,8 +112,9 @@ class BaseHandler(object):
guest_access = event.content.get("guest_access", "forbidden")
if guest_access != "can_join":
if context:
+ current_state_ids = yield context.get_current_state_ids(self.store)
current_state = yield self.store.get_events(
- list(context.current_state_ids.values())
+ list(current_state_ids.values())
)
else:
current_state = yield self.state_handler.get_current_state(
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index ec9fe01a5a..ee41aed69e 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -23,6 +23,7 @@ from twisted.internet import defer
import synapse
from synapse.api.constants import EventTypes
+from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
from synapse.util.metrics import Measure
@@ -106,7 +107,9 @@ class ApplicationServicesHandler(object):
yield self._check_user_exists(event.state_key)
if not self.started_scheduler:
- self.scheduler.start().addErrback(log_failure)
+ def start_scheduler():
+ return self.scheduler.start().addErrback(log_failure)
+ run_as_background_process("as_scheduler", start_scheduler)
self.started_scheduler = True
# Fork off pushes to these services
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 60391d07c4..c0dd468940 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -21,8 +21,8 @@ import logging
import sys
import six
-from six import iteritems
-from six.moves import http_client
+from six import iteritems, itervalues
+from six.moves import http_client, zip
from signedjson.key import decode_verify_key_bytes
from signedjson.sign import verify_signed_json
@@ -486,7 +486,10 @@ class FederationHandler(BaseHandler):
# joined the room. Don't bother if the user is just
# changing their profile info.
newly_joined = True
- prev_state_id = context.prev_state_ids.get(
+
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
+
+ prev_state_id = prev_state_ids.get(
(event.type, event.state_key)
)
if prev_state_id:
@@ -731,7 +734,7 @@ class FederationHandler(BaseHandler):
"""
joined_users = [
(state_key, int(event.depth))
- for (e_type, state_key), event in state.iteritems()
+ for (e_type, state_key), event in iteritems(state)
if e_type == EventTypes.Member
and event.membership == Membership.JOIN
]
@@ -748,7 +751,7 @@ class FederationHandler(BaseHandler):
except Exception:
pass
- return sorted(joined_domains.iteritems(), key=lambda d: d[1])
+ return sorted(joined_domains.items(), key=lambda d: d[1])
curr_domains = get_domains_from_state(curr_state)
@@ -811,7 +814,7 @@ class FederationHandler(BaseHandler):
tried_domains = set(likely_domains)
tried_domains.add(self.server_name)
- event_ids = list(extremities.iterkeys())
+ event_ids = list(extremities.keys())
logger.debug("calling resolve_state_groups in _maybe_backfill")
resolve = logcontext.preserve_fn(
@@ -827,15 +830,15 @@ class FederationHandler(BaseHandler):
states = dict(zip(event_ids, [s.state for s in states]))
state_map = yield self.store.get_events(
- [e_id for ids in states.itervalues() for e_id in ids.itervalues()],
+ [e_id for ids in itervalues(states) for e_id in itervalues(ids)],
get_prev_content=False
)
states = {
key: {
k: state_map[e_id]
- for k, e_id in state_dict.iteritems()
+ for k, e_id in iteritems(state_dict)
if e_id in state_map
- } for key, state_dict in states.iteritems()
+ } for key, state_dict in iteritems(states)
}
for e_id, _ in sorted_extremeties_tuple:
@@ -906,16 +909,6 @@ class FederationHandler(BaseHandler):
[auth_id for auth_id, _ in event.auth_events],
include_given=True
)
-
- for event in auth:
- event.signatures.update(
- compute_event_signature(
- event,
- self.hs.hostname,
- self.hs.config.signing_key[0]
- )
- )
-
defer.returnValue([e for e in auth])
@log_function
@@ -1116,10 +1109,12 @@ class FederationHandler(BaseHandler):
user = UserID.from_string(event.state_key)
yield user_joined_room(self.distributor, user, event.room_id)
- state_ids = list(context.prev_state_ids.values())
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
+
+ state_ids = list(prev_state_ids.values())
auth_chain = yield self.store.get_auth_chain(state_ids)
- state = yield self.store.get_events(list(context.prev_state_ids.values()))
+ state = yield self.store.get_events(list(prev_state_ids.values()))
defer.returnValue({
"state": list(state.values()),
@@ -1375,18 +1370,6 @@ class FederationHandler(BaseHandler):
del results[(event.type, event.state_key)]
res = list(results.values())
- for event in res:
- # We sign these again because there was a bug where we
- # incorrectly signed things the first time round
- if self.is_mine_id(event.event_id):
- event.signatures.update(
- compute_event_signature(
- event,
- self.hs.hostname,
- self.hs.config.signing_key[0]
- )
- )
-
defer.returnValue(res)
else:
defer.returnValue([])
@@ -1461,18 +1444,6 @@ class FederationHandler(BaseHandler):
)
if event:
- if self.is_mine_id(event.event_id):
- # FIXME: This is a temporary work around where we occasionally
- # return events slightly differently than when they were
- # originally signed
- event.signatures.update(
- compute_event_signature(
- event,
- self.hs.hostname,
- self.hs.config.signing_key[0]
- )
- )
-
in_room = yield self.auth.check_host_in_room(
event.room_id,
origin
@@ -1556,7 +1527,7 @@ class FederationHandler(BaseHandler):
yield self.store.persist_events(
[
(ev_info["event"], context)
- for ev_info, context in itertools.izip(event_infos, contexts)
+ for ev_info, context in zip(event_infos, contexts)
],
backfilled=backfilled,
)
@@ -1676,8 +1647,9 @@ class FederationHandler(BaseHandler):
)
if not auth_events:
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
auth_events_ids = yield self.auth.compute_auth_events(
- event, context.prev_state_ids, for_verification=True,
+ event, prev_state_ids, for_verification=True,
)
auth_events = yield self.store.get_events(auth_events_ids)
auth_events = {
@@ -1747,15 +1719,6 @@ class FederationHandler(BaseHandler):
local_auth_chain, remote_auth_chain
)
- for event in ret["auth_chain"]:
- event.signatures.update(
- compute_event_signature(
- event,
- self.hs.hostname,
- self.hs.config.signing_key[0]
- )
- )
-
logger.debug("on_query_auth returning: %s", ret)
defer.returnValue(ret)
@@ -1936,9 +1899,10 @@ class FederationHandler(BaseHandler):
break
if do_resolution:
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
# 1. Get what we think is the auth chain.
auth_ids = yield self.auth.compute_auth_events(
- event, context.prev_state_ids
+ event, prev_state_ids
)
local_auth_chain = yield self.store.get_auth_chain(
auth_ids, include_given=True
@@ -2028,21 +1992,34 @@ class FederationHandler(BaseHandler):
k: a.event_id for k, a in iteritems(auth_events)
if k != event_key
}
- context.current_state_ids = dict(context.current_state_ids)
- context.current_state_ids.update(state_updates)
- if context.delta_ids is not None:
- context.delta_ids = dict(context.delta_ids)
- context.delta_ids.update(state_updates)
- context.prev_state_ids = dict(context.prev_state_ids)
- context.prev_state_ids.update({
+ current_state_ids = yield context.get_current_state_ids(self.store)
+ current_state_ids = dict(current_state_ids)
+
+ current_state_ids.update(state_updates)
+
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
+ prev_state_ids = dict(prev_state_ids)
+
+ prev_state_ids.update({
k: a.event_id for k, a in iteritems(auth_events)
})
- context.state_group = yield self.store.store_state_group(
+
+ # create a new state group as a delta from the existing one.
+ prev_group = context.state_group
+ state_group = yield self.store.store_state_group(
event.event_id,
event.room_id,
- prev_group=context.prev_group,
- delta_ids=context.delta_ids,
- current_state_ids=context.current_state_ids,
+ prev_group=prev_group,
+ delta_ids=state_updates,
+ current_state_ids=current_state_ids,
+ )
+
+ yield context.update_state(
+ state_group=state_group,
+ current_state_ids=current_state_ids,
+ prev_state_ids=prev_state_ids,
+ prev_group=prev_group,
+ delta_ids=state_updates,
)
@defer.inlineCallbacks
@@ -2282,7 +2259,8 @@ class FederationHandler(BaseHandler):
event.content["third_party_invite"]["signed"]["token"]
)
original_invite = None
- original_invite_id = context.prev_state_ids.get(key)
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
+ original_invite_id = prev_state_ids.get(key)
if original_invite_id:
original_invite = yield self.store.get_event(
original_invite_id, allow_none=True
@@ -2324,7 +2302,8 @@ class FederationHandler(BaseHandler):
signed = event.content["third_party_invite"]["signed"]
token = signed["token"]
- invite_event_id = context.prev_state_ids.get(
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
+ invite_event_id = prev_state_ids.get(
(EventTypes.ThirdPartyInvite, token,)
)
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index fb11716eb8..9250b4d1d8 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -148,13 +148,15 @@ class InitialSyncHandler(BaseHandler):
try:
if event.membership == Membership.JOIN:
room_end_token = now_token.room_key
- deferred_room_state = self.state_handler.get_current_state(
- event.room_id
+ deferred_room_state = run_in_background(
+ self.state_handler.get_current_state,
+ event.room_id,
)
elif event.membership == Membership.LEAVE:
room_end_token = "s%d" % (event.stream_ordering,)
- deferred_room_state = self.store.get_state_for_events(
- [event.event_id], None
+ deferred_room_state = run_in_background(
+ self.store.get_state_for_events,
+ [event.event_id], None,
)
deferred_room_state.addCallback(
lambda states: states[event.event_id]
@@ -370,6 +372,7 @@ class InitialSyncHandler(BaseHandler):
@defer.inlineCallbacks
def get_presence():
+ defer.returnValue([])
states = yield presence_handler.get_states(
[m.user_id for m in room_members],
as_event=True,
@@ -387,19 +390,21 @@ class InitialSyncHandler(BaseHandler):
receipts = []
defer.returnValue(receipts)
- presence, receipts, (messages, token) = yield defer.gatherResults(
- [
- run_in_background(get_presence),
- run_in_background(get_receipts),
- run_in_background(
- self.store.get_recent_events_for_room,
- room_id,
- limit=limit,
- end_token=now_token.room_key,
- )
- ],
- consumeErrors=True,
- ).addErrback(unwrapFirstError)
+ presence, receipts, (messages, token) = yield make_deferred_yieldable(
+ defer.gatherResults(
+ [
+ run_in_background(get_presence),
+ run_in_background(get_receipts),
+ run_in_background(
+ self.store.get_recent_events_for_room,
+ room_id,
+ limit=limit,
+ end_token=now_token.room_key,
+ )
+ ],
+ consumeErrors=True,
+ ).addErrback(unwrapFirstError),
+ )
messages = yield filter_events_for_client(
self.store, user_id, messages, is_peeking=is_peeking,
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index a39b852ceb..9a8c03c3a5 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -23,7 +23,6 @@ from canonicaljson import encode_canonical_json, json
from twisted.internet import defer
from twisted.internet.defer import succeed
-from twisted.python.failure import Failure
from synapse.api.constants import MAX_DEPTH, EventTypes, Membership
from synapse.api.errors import AuthError, Codes, ConsentNotGivenError, SynapseError
@@ -32,247 +31,26 @@ from synapse.crypto.event_signing import add_hashes_and_signatures
from synapse.events.utils import serialize_event
from synapse.events.validator import EventValidator
from synapse.replication.http.send_event import send_event_to_master
-from synapse.types import RoomAlias, RoomStreamToken, UserID
-from synapse.util.async import Limiter, ReadWriteLock
+from synapse.types import RoomAlias, UserID
+from synapse.util.async import Linearizer
from synapse.util.frozenutils import frozendict_json_encoder
from synapse.util.logcontext import run_in_background
from synapse.util.metrics import measure_func
-from synapse.util.stringutils import random_string
-from synapse.visibility import filter_events_for_client
from ._base import BaseHandler
logger = logging.getLogger(__name__)
-class PurgeStatus(object):
- """Object tracking the status of a purge request
-
- This class contains information on the progress of a purge request, for
- return by get_purge_status.
-
- Attributes:
- status (int): Tracks whether this request has completed. One of
- STATUS_{ACTIVE,COMPLETE,FAILED}
+class MessageHandler(object):
+ """Contains some read only APIs to get state about a room
"""
- STATUS_ACTIVE = 0
- STATUS_COMPLETE = 1
- STATUS_FAILED = 2
-
- STATUS_TEXT = {
- STATUS_ACTIVE: "active",
- STATUS_COMPLETE: "complete",
- STATUS_FAILED: "failed",
- }
-
- def __init__(self):
- self.status = PurgeStatus.STATUS_ACTIVE
-
- def asdict(self):
- return {
- "status": PurgeStatus.STATUS_TEXT[self.status]
- }
-
-
-class MessageHandler(BaseHandler):
-
def __init__(self, hs):
- super(MessageHandler, self).__init__(hs)
- self.hs = hs
- self.state = hs.get_state_handler()
+ self.auth = hs.get_auth()
self.clock = hs.get_clock()
-
- self.pagination_lock = ReadWriteLock()
- self._purges_in_progress_by_room = set()
- # map from purge id to PurgeStatus
- self._purges_by_id = {}
-
- def start_purge_history(self, room_id, token,
- delete_local_events=False):
- """Start off a history purge on a room.
-
- Args:
- room_id (str): The room to purge from
-
- token (str): topological token to delete events before
- delete_local_events (bool): True to delete local events as well as
- remote ones
-
- Returns:
- str: unique ID for this purge transaction.
- """
- if room_id in self._purges_in_progress_by_room:
- raise SynapseError(
- 400,
- "History purge already in progress for %s" % (room_id, ),
- )
-
- purge_id = random_string(16)
-
- # we log the purge_id here so that it can be tied back to the
- # request id in the log lines.
- logger.info("[purge] starting purge_id %s", purge_id)
-
- self._purges_by_id[purge_id] = PurgeStatus()
- run_in_background(
- self._purge_history,
- purge_id, room_id, token, delete_local_events,
- )
- return purge_id
-
- @defer.inlineCallbacks
- def _purge_history(self, purge_id, room_id, token,
- delete_local_events):
- """Carry out a history purge on a room.
-
- Args:
- purge_id (str): The id for this purge
- room_id (str): The room to purge from
- token (str): topological token to delete events before
- delete_local_events (bool): True to delete local events as well as
- remote ones
-
- Returns:
- Deferred
- """
- self._purges_in_progress_by_room.add(room_id)
- try:
- with (yield self.pagination_lock.write(room_id)):
- yield self.store.purge_history(
- room_id, token, delete_local_events,
- )
- logger.info("[purge] complete")
- self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE
- except Exception:
- logger.error("[purge] failed: %s", Failure().getTraceback().rstrip())
- self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED
- finally:
- self._purges_in_progress_by_room.discard(room_id)
-
- # remove the purge from the list 24 hours after it completes
- def clear_purge():
- del self._purges_by_id[purge_id]
- self.hs.get_reactor().callLater(24 * 3600, clear_purge)
-
- def get_purge_status(self, purge_id):
- """Get the current status of an active purge
-
- Args:
- purge_id (str): purge_id returned by start_purge_history
-
- Returns:
- PurgeStatus|None
- """
- return self._purges_by_id.get(purge_id)
-
- @defer.inlineCallbacks
- def get_messages(self, requester, room_id=None, pagin_config=None,
- as_client_event=True, event_filter=None):
- """Get messages in a room.
-
- Args:
- requester (Requester): The user requesting messages.
- room_id (str): The room they want messages from.
- pagin_config (synapse.api.streams.PaginationConfig): The pagination
- config rules to apply, if any.
- as_client_event (bool): True to get events in client-server format.
- event_filter (Filter): Filter to apply to results or None
- Returns:
- dict: Pagination API results
- """
- user_id = requester.user.to_string()
-
- if pagin_config.from_token:
- room_token = pagin_config.from_token.room_key
- else:
- pagin_config.from_token = (
- yield self.hs.get_event_sources().get_current_token_for_room(
- room_id=room_id
- )
- )
- room_token = pagin_config.from_token.room_key
-
- room_token = RoomStreamToken.parse(room_token)
-
- pagin_config.from_token = pagin_config.from_token.copy_and_replace(
- "room_key", str(room_token)
- )
-
- source_config = pagin_config.get_source_config("room")
-
- with (yield self.pagination_lock.read(room_id)):
- membership, member_event_id = yield self._check_in_room_or_world_readable(
- room_id, user_id
- )
-
- if source_config.direction == 'b':
- # if we're going backwards, we might need to backfill. This
- # requires that we have a topo token.
- if room_token.topological:
- max_topo = room_token.topological
- else:
- max_topo = yield self.store.get_max_topological_token(
- room_id, room_token.stream
- )
-
- if membership == Membership.LEAVE:
- # If they have left the room then clamp the token to be before
- # they left the room, to save the effort of loading from the
- # database.
- leave_token = yield self.store.get_topological_token_for_event(
- member_event_id
- )
- leave_token = RoomStreamToken.parse(leave_token)
- if leave_token.topological < max_topo:
- source_config.from_key = str(leave_token)
-
- yield self.hs.get_handlers().federation_handler.maybe_backfill(
- room_id, max_topo
- )
-
- events, next_key = yield self.store.paginate_room_events(
- room_id=room_id,
- from_key=source_config.from_key,
- to_key=source_config.to_key,
- direction=source_config.direction,
- limit=source_config.limit,
- event_filter=event_filter,
- )
-
- next_token = pagin_config.from_token.copy_and_replace(
- "room_key", next_key
- )
-
- if not events:
- defer.returnValue({
- "chunk": [],
- "start": pagin_config.from_token.to_string(),
- "end": next_token.to_string(),
- })
-
- if event_filter:
- events = event_filter.filter(events)
-
- events = yield filter_events_for_client(
- self.store,
- user_id,
- events,
- is_peeking=(member_event_id is None),
- )
-
- time_now = self.clock.time_msec()
-
- chunk = {
- "chunk": [
- serialize_event(e, time_now, as_client_event)
- for e in events
- ],
- "start": pagin_config.from_token.to_string(),
- "end": next_token.to_string(),
- }
-
- defer.returnValue(chunk)
+ self.state = hs.get_state_handler()
+ self.store = hs.get_datastore()
@defer.inlineCallbacks
def get_room_data(self, user_id=None, room_id=None,
@@ -286,12 +64,12 @@ class MessageHandler(BaseHandler):
Raises:
SynapseError if something went wrong.
"""
- membership, membership_event_id = yield self._check_in_room_or_world_readable(
+ membership, membership_event_id = yield self.auth.check_in_room_or_world_readable(
room_id, user_id
)
if membership == Membership.JOIN:
- data = yield self.state_handler.get_current_state(
+ data = yield self.state.get_current_state(
room_id, event_type, state_key
)
elif membership == Membership.LEAVE:
@@ -304,31 +82,6 @@ class MessageHandler(BaseHandler):
defer.returnValue(data)
@defer.inlineCallbacks
- def _check_in_room_or_world_readable(self, room_id, user_id):
- try:
- # check_user_was_in_room will return the most recent membership
- # event for the user if:
- # * The user is a non-guest user, and was ever in the room
- # * The user is a guest user, and has joined the room
- # else it will throw.
- member_event = yield self.auth.check_user_was_in_room(room_id, user_id)
- defer.returnValue((member_event.membership, member_event.event_id))
- return
- except AuthError:
- visibility = yield self.state_handler.get_current_state(
- room_id, EventTypes.RoomHistoryVisibility, ""
- )
- if (
- visibility and
- visibility.content["history_visibility"] == "world_readable"
- ):
- defer.returnValue((Membership.JOIN, None))
- return
- raise AuthError(
- 403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN
- )
-
- @defer.inlineCallbacks
def get_state_events(self, user_id, room_id, is_guest=False):
"""Retrieve all state events for a given room. If the user is
joined to the room then return the current state. If the user has
@@ -340,12 +93,12 @@ class MessageHandler(BaseHandler):
Returns:
A list of dicts representing state events. [{}, {}, {}]
"""
- membership, membership_event_id = yield self._check_in_room_or_world_readable(
+ membership, membership_event_id = yield self.auth.check_in_room_or_world_readable(
room_id, user_id
)
if membership == Membership.JOIN:
- room_state = yield self.state_handler.get_current_state(room_id)
+ room_state = yield self.state.get_current_state(room_id)
elif membership == Membership.LEAVE:
room_state = yield self.store.get_state_for_events(
[membership_event_id], None
@@ -373,7 +126,7 @@ class MessageHandler(BaseHandler):
if not requester.app_service:
# We check AS auth after fetching the room membership, as it
# requires us to pull out all joined members anyway.
- membership, _ = yield self._check_in_room_or_world_readable(
+ membership, _ = yield self.auth.check_in_room_or_world_readable(
room_id, user_id
)
if membership != Membership.JOIN:
@@ -386,7 +139,7 @@ class MessageHandler(BaseHandler):
# If this is an AS, double check that they are allowed to see the members.
# This can either be because the AS user is in the room or because there
# is a user in the room that the AS is "interested in"
- if requester.app_service and user_id not in users_with_profile:
+ if False and requester.app_service and user_id not in users_with_profile:
for uid in users_with_profile:
if requester.app_service.is_interested_in_user(uid):
break
@@ -427,7 +180,7 @@ class EventCreationHandler(object):
# We arbitrarily limit concurrent event creation for a room to 5.
# This is to stop us from diverging history *too* much.
- self.limiter = Limiter(max_count=5)
+ self.limiter = Linearizer(max_count=5, name="room_event_creation_limit")
self.action_generator = hs.get_action_generator()
@@ -630,7 +383,8 @@ class EventCreationHandler(object):
If so, returns the version of the event in context.
Otherwise, returns None.
"""
- prev_event_id = context.prev_state_ids.get((event.type, event.state_key))
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
+ prev_event_id = prev_state_ids.get((event.type, event.state_key))
prev_event = yield self.store.get_event(prev_event_id, allow_none=True)
if not prev_event:
return
@@ -752,8 +506,8 @@ class EventCreationHandler(object):
event = builder.build()
logger.debug(
- "Created event %s with state: %s",
- event.event_id, context.prev_state_ids,
+ "Created event %s",
+ event.event_id,
)
defer.returnValue(
@@ -806,8 +560,9 @@ class EventCreationHandler(object):
# If we're a worker we need to hit out to the master.
if self.config.worker_app:
yield send_event_to_master(
- self.hs.get_clock(),
- self.http_client,
+ clock=self.hs.get_clock(),
+ store=self.store,
+ client=self.http_client,
host=self.config.worker_replication_host,
port=self.config.worker_replication_http_port,
requester=requester,
@@ -884,9 +639,11 @@ class EventCreationHandler(object):
e.sender == event.sender
)
+ current_state_ids = yield context.get_current_state_ids(self.store)
+
state_to_include_ids = [
e_id
- for k, e_id in iteritems(context.current_state_ids)
+ for k, e_id in iteritems(current_state_ids)
if k[0] in self.hs.config.room_invite_state_types
or k == (EventTypes.Member, event.sender)
]
@@ -922,8 +679,9 @@ class EventCreationHandler(object):
)
if event.type == EventTypes.Redaction:
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
auth_events_ids = yield self.auth.compute_auth_events(
- event, context.prev_state_ids, for_verification=True,
+ event, prev_state_ids, for_verification=True,
)
auth_events = yield self.store.get_events(auth_events_ids)
auth_events = {
@@ -943,11 +701,13 @@ class EventCreationHandler(object):
"You don't have permission to redact events"
)
- if event.type == EventTypes.Create and context.prev_state_ids:
- raise AuthError(
- 403,
- "Changing the room create event is forbidden",
- )
+ if event.type == EventTypes.Create:
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
+ if prev_state_ids:
+ raise AuthError(
+ 403,
+ "Changing the room create event is forbidden",
+ )
(event_stream_id, max_stream_id) = yield self.store.persist_event(
event, context=context
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
new file mode 100644
index 0000000000..b2849783ed
--- /dev/null
+++ b/synapse/handlers/pagination.py
@@ -0,0 +1,265 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014 - 2016 OpenMarket Ltd
+# Copyright 2017 - 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+
+from twisted.internet import defer
+from twisted.python.failure import Failure
+
+from synapse.api.constants import Membership
+from synapse.api.errors import SynapseError
+from synapse.events.utils import serialize_event
+from synapse.types import RoomStreamToken
+from synapse.util.async import ReadWriteLock
+from synapse.util.logcontext import run_in_background
+from synapse.util.stringutils import random_string
+from synapse.visibility import filter_events_for_client
+
+logger = logging.getLogger(__name__)
+
+
+class PurgeStatus(object):
+ """Object tracking the status of a purge request
+
+ This class contains information on the progress of a purge request, for
+ return by get_purge_status.
+
+ Attributes:
+ status (int): Tracks whether this request has completed. One of
+ STATUS_{ACTIVE,COMPLETE,FAILED}
+ """
+
+ STATUS_ACTIVE = 0
+ STATUS_COMPLETE = 1
+ STATUS_FAILED = 2
+
+ STATUS_TEXT = {
+ STATUS_ACTIVE: "active",
+ STATUS_COMPLETE: "complete",
+ STATUS_FAILED: "failed",
+ }
+
+ def __init__(self):
+ self.status = PurgeStatus.STATUS_ACTIVE
+
+ def asdict(self):
+ return {
+ "status": PurgeStatus.STATUS_TEXT[self.status]
+ }
+
+
+class PaginationHandler(object):
+ """Handles pagination and purge history requests.
+
+ These are in the same handler due to the fact we need to block clients
+ paginating during a purge.
+ """
+
+ def __init__(self, hs):
+ self.hs = hs
+ self.auth = hs.get_auth()
+ self.store = hs.get_datastore()
+ self.clock = hs.get_clock()
+
+ self.pagination_lock = ReadWriteLock()
+ self._purges_in_progress_by_room = set()
+ # map from purge id to PurgeStatus
+ self._purges_by_id = {}
+
+ def start_purge_history(self, room_id, token,
+ delete_local_events=False):
+ """Start off a history purge on a room.
+
+ Args:
+ room_id (str): The room to purge from
+
+ token (str): topological token to delete events before
+ delete_local_events (bool): True to delete local events as well as
+ remote ones
+
+ Returns:
+ str: unique ID for this purge transaction.
+ """
+ if room_id in self._purges_in_progress_by_room:
+ raise SynapseError(
+ 400,
+ "History purge already in progress for %s" % (room_id, ),
+ )
+
+ purge_id = random_string(16)
+
+ # we log the purge_id here so that it can be tied back to the
+ # request id in the log lines.
+ logger.info("[purge] starting purge_id %s", purge_id)
+
+ self._purges_by_id[purge_id] = PurgeStatus()
+ run_in_background(
+ self._purge_history,
+ purge_id, room_id, token, delete_local_events,
+ )
+ return purge_id
+
+ @defer.inlineCallbacks
+ def _purge_history(self, purge_id, room_id, token,
+ delete_local_events):
+ """Carry out a history purge on a room.
+
+ Args:
+ purge_id (str): The id for this purge
+ room_id (str): The room to purge from
+ token (str): topological token to delete events before
+ delete_local_events (bool): True to delete local events as well as
+ remote ones
+
+ Returns:
+ Deferred
+ """
+ self._purges_in_progress_by_room.add(room_id)
+ try:
+ with (yield self.pagination_lock.write(room_id)):
+ yield self.store.purge_history(
+ room_id, token, delete_local_events,
+ )
+ logger.info("[purge] complete")
+ self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE
+ except Exception:
+ logger.error("[purge] failed: %s", Failure().getTraceback().rstrip())
+ self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED
+ finally:
+ self._purges_in_progress_by_room.discard(room_id)
+
+ # remove the purge from the list 24 hours after it completes
+ def clear_purge():
+ del self._purges_by_id[purge_id]
+ self.hs.get_reactor().callLater(24 * 3600, clear_purge)
+
+ def get_purge_status(self, purge_id):
+ """Get the current status of an active purge
+
+ Args:
+ purge_id (str): purge_id returned by start_purge_history
+
+ Returns:
+ PurgeStatus|None
+ """
+ return self._purges_by_id.get(purge_id)
+
+ @defer.inlineCallbacks
+ def get_messages(self, requester, room_id=None, pagin_config=None,
+ as_client_event=True, event_filter=None):
+ """Get messages in a room.
+
+ Args:
+ requester (Requester): The user requesting messages.
+ room_id (str): The room they want messages from.
+ pagin_config (synapse.api.streams.PaginationConfig): The pagination
+ config rules to apply, if any.
+ as_client_event (bool): True to get events in client-server format.
+ event_filter (Filter): Filter to apply to results or None
+ Returns:
+ dict: Pagination API results
+ """
+ user_id = requester.user.to_string()
+
+ if pagin_config.from_token:
+ room_token = pagin_config.from_token.room_key
+ else:
+ pagin_config.from_token = (
+ yield self.hs.get_event_sources().get_current_token_for_room(
+ room_id=room_id
+ )
+ )
+ room_token = pagin_config.from_token.room_key
+
+ room_token = RoomStreamToken.parse(room_token)
+
+ pagin_config.from_token = pagin_config.from_token.copy_and_replace(
+ "room_key", str(room_token)
+ )
+
+ source_config = pagin_config.get_source_config("room")
+
+ with (yield self.pagination_lock.read(room_id)):
+ membership, member_event_id = yield self.auth.check_in_room_or_world_readable(
+ room_id, user_id
+ )
+
+ if source_config.direction == 'b':
+ # if we're going backwards, we might need to backfill. This
+ # requires that we have a topo token.
+ if room_token.topological:
+ max_topo = room_token.topological
+ else:
+ max_topo = yield self.store.get_max_topological_token(
+ room_id, room_token.stream
+ )
+
+ if membership == Membership.LEAVE:
+ # If they have left the room then clamp the token to be before
+ # they left the room, to save the effort of loading from the
+ # database.
+ leave_token = yield self.store.get_topological_token_for_event(
+ member_event_id
+ )
+ leave_token = RoomStreamToken.parse(leave_token)
+ if leave_token.topological < max_topo:
+ source_config.from_key = str(leave_token)
+
+ yield self.hs.get_handlers().federation_handler.maybe_backfill(
+ room_id, max_topo
+ )
+
+ events, next_key = yield self.store.paginate_room_events(
+ room_id=room_id,
+ from_key=source_config.from_key,
+ to_key=source_config.to_key,
+ direction=source_config.direction,
+ limit=source_config.limit,
+ event_filter=event_filter,
+ )
+
+ next_token = pagin_config.from_token.copy_and_replace(
+ "room_key", next_key
+ )
+
+ if not events:
+ defer.returnValue({
+ "chunk": [],
+ "start": pagin_config.from_token.to_string(),
+ "end": next_token.to_string(),
+ })
+
+ if event_filter:
+ events = event_filter.filter(events)
+
+ events = yield filter_events_for_client(
+ self.store,
+ user_id,
+ events,
+ is_peeking=(member_event_id is None),
+ )
+
+ time_now = self.clock.time_msec()
+
+ chunk = {
+ "chunk": [
+ serialize_event(e, time_now, as_client_event)
+ for e in events
+ ],
+ "start": pagin_config.from_token.to_string(),
+ "end": next_token.to_string(),
+ }
+
+ defer.returnValue(chunk)
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 3732830194..d030cfe77c 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -390,6 +390,7 @@ class PresenceHandler(object):
"""We've seen the user do something that indicates they're interacting
with the app.
"""
+ return
user_id = user.to_string()
bump_active_time_counter.inc()
@@ -419,6 +420,7 @@ class PresenceHandler(object):
Useful for streams that are not associated with an actual
client that is being used by a user.
"""
+ affect_presence = False
if affect_presence:
curr_sync = self.user_to_num_current_syncs.get(user_id, 0)
self.user_to_num_current_syncs[user_id] = curr_sync + 1
@@ -464,6 +466,8 @@ class PresenceHandler(object):
Returns:
set(str): A set of user_id strings.
"""
+ # presence is disabled on matrix.org, so we return the empty set
+ return set()
syncing_user_ids = {
user_id for user_id, count in self.user_to_num_current_syncs.items()
if count
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index f67512078b..003b848c00 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -24,7 +24,7 @@ from twisted.internet import defer
from synapse.api.constants import EventTypes, JoinRules, RoomCreationPreset
from synapse.api.errors import AuthError, Codes, StoreError, SynapseError
-from synapse.types import RoomAlias, RoomID, RoomStreamToken, UserID
+from synapse.types import RoomAlias, RoomID, RoomStreamToken, StreamToken, UserID
from synapse.util import stringutils
from synapse.visibility import filter_events_for_client
@@ -395,7 +395,11 @@ class RoomCreationHandler(BaseHandler):
)
-class RoomContextHandler(BaseHandler):
+class RoomContextHandler(object):
+ def __init__(self, hs):
+ self.hs = hs
+ self.store = hs.get_datastore()
+
@defer.inlineCallbacks
def get_event_context(self, user, room_id, event_id, limit):
"""Retrieves events, pagination tokens and state around a given event
@@ -414,8 +418,6 @@ class RoomContextHandler(BaseHandler):
before_limit = math.floor(limit / 2.)
after_limit = limit - before_limit
- now_token = yield self.hs.get_event_sources().get_current_token()
-
users = yield self.store.get_users_in_room(room_id)
is_peeking = user.to_string() not in users
@@ -458,11 +460,15 @@ class RoomContextHandler(BaseHandler):
)
results["state"] = list(state[last_event_id].values())
- results["start"] = now_token.copy_and_replace(
+ # We use a dummy token here as we only care about the room portion of
+ # the token, which we replace.
+ token = StreamToken.START
+
+ results["start"] = token.copy_and_replace(
"room_key", results["start"]
).to_string()
- results["end"] = now_token.copy_and_replace(
+ results["end"] = token.copy_and_replace(
"room_key", results["end"]
).to_string()
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index 828229f5c3..55985d7803 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -44,9 +44,12 @@ EMPTY_THIRD_PARTY_ID = ThirdPartyInstanceID(None, None)
class RoomListHandler(BaseHandler):
def __init__(self, hs):
super(RoomListHandler, self).__init__(hs)
- self.response_cache = ResponseCache(hs, "room_list")
- self.remote_response_cache = ResponseCache(hs, "remote_room_list",
- timeout_ms=30 * 1000)
+ self.response_cache = ResponseCache(
+ hs, "room_list", timeout_ms=10 * 60 * 1000,
+ )
+ self.remote_response_cache = ResponseCache(
+ hs, "remote_room_list", timeout_ms=30 * 1000,
+ )
def get_local_public_room_list(self, limit=None, since_token=None,
search_filter=None,
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 00f2e279bc..6184737cd4 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -66,6 +66,7 @@ class RoomMemberHandler(object):
self.event_creation_hander = hs.get_event_creation_handler()
self.member_linearizer = Linearizer(name="member")
+ self.member_limiter = Linearizer(max_count=10, name="member_as_limiter")
self.clock = hs.get_clock()
self.spam_checker = hs.get_spam_checker()
@@ -201,7 +202,9 @@ class RoomMemberHandler(object):
ratelimit=ratelimit,
)
- prev_member_event_id = context.prev_state_ids.get(
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
+
+ prev_member_event_id = prev_state_ids.get(
(EventTypes.Member, target.to_string()),
None
)
@@ -239,18 +242,37 @@ class RoomMemberHandler(object):
):
key = (room_id,)
- with (yield self.member_linearizer.queue(key)):
- result = yield self._update_membership(
- requester,
- target,
- room_id,
- action,
- txn_id=txn_id,
- remote_room_hosts=remote_room_hosts,
- third_party_signed=third_party_signed,
- ratelimit=ratelimit,
- content=content,
- )
+ as_id = object()
+ if requester.app_service:
+ as_id = requester.app_service.id
+
+ then = self.clock.time_msec()
+
+ with (yield self.member_limiter.queue(as_id)):
+ diff = self.clock.time_msec() - then
+
+ if diff > 80 * 1000:
+ # haproxy would have timed the request out anyway...
+ raise SynapseError(504, "took to long to process")
+
+ with (yield self.member_linearizer.queue(key)):
+ diff = self.clock.time_msec() - then
+
+ if diff > 80 * 1000:
+ # haproxy would have timed the request out anyway...
+ raise SynapseError(504, "took to long to process")
+
+ result = yield self._update_membership(
+ requester,
+ target,
+ room_id,
+ action,
+ txn_id=txn_id,
+ remote_room_hosts=remote_room_hosts,
+ third_party_signed=third_party_signed,
+ ratelimit=ratelimit,
+ content=content,
+ )
defer.returnValue(result)
@@ -496,9 +518,10 @@ class RoomMemberHandler(object):
if prev_event is not None:
return
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
if event.membership == Membership.JOIN:
if requester.is_guest:
- guest_can_join = yield self._can_guest_join(context.prev_state_ids)
+ guest_can_join = yield self._can_guest_join(prev_state_ids)
if not guest_can_join:
# This should be an auth check, but guests are a local concept,
# so don't really fit into the general auth process.
@@ -517,7 +540,7 @@ class RoomMemberHandler(object):
ratelimit=ratelimit,
)
- prev_member_event_id = context.prev_state_ids.get(
+ prev_member_event_id = prev_state_ids.get(
(EventTypes.Member, event.state_key),
None
)
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index c24e35362a..a006c952f2 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -32,6 +32,7 @@ from synapse.visibility import filter_events_for_client
logger = logging.getLogger(__name__)
+SYNC_RESPONSE_CACHE_MS = 2 * 60 * 1000
SyncConfig = collections.namedtuple("SyncConfig", [
"user",
@@ -178,7 +179,9 @@ class SyncHandler(object):
self.presence_handler = hs.get_presence_handler()
self.event_sources = hs.get_event_sources()
self.clock = hs.get_clock()
- self.response_cache = ResponseCache(hs, "sync")
+ self.response_cache = ResponseCache(
+ hs, "sync", timeout_ms=SYNC_RESPONSE_CACHE_MS,
+ )
self.state = hs.get_state_handler()
def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0,
@@ -620,7 +623,7 @@ class SyncHandler(object):
since_token is None and
sync_config.filter_collection.blocks_all_presence()
)
- if not block_all_presence_data:
+ if False and not block_all_presence_data:
yield self._generate_sync_entry_for_presence(
sync_result_builder, newly_joined_rooms, newly_joined_users
)
diff --git a/synapse/http/client.py b/synapse/http/client.py
index d6a0d75b2b..25b6307884 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -26,9 +26,11 @@ from OpenSSL.SSL import VERIFY_NONE
from twisted.internet import defer, protocol, reactor, ssl, task
from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
from twisted.web._newclient import ResponseDone
-from twisted.web.client import Agent, BrowserLikeRedirectAgent, ContentDecoderAgent
-from twisted.web.client import FileBodyProducer as TwistedFileBodyProducer
from twisted.web.client import (
+ Agent,
+ BrowserLikeRedirectAgent,
+ ContentDecoderAgent,
+ FileBodyProducer as TwistedFileBodyProducer,
GzipDecoder,
HTTPConnectionPool,
PartialDownloadError,
diff --git a/synapse/http/request_metrics.py b/synapse/http/request_metrics.py
index f24b4b949c..588e280571 100644
--- a/synapse/http/request_metrics.py
+++ b/synapse/http/request_metrics.py
@@ -38,7 +38,8 @@ outgoing_responses_counter = Counter(
)
response_timer = Histogram(
- "synapse_http_server_response_time_seconds", "sec", ["method", "servlet", "tag"]
+ "synapse_http_server_response_time_seconds", "sec",
+ ["method", "servlet", "tag", "code"],
)
response_ru_utime = Counter(
@@ -171,11 +172,13 @@ class RequestMetrics(object):
)
return
- outgoing_responses_counter.labels(request.method, str(request.code)).inc()
+ response_code = str(request.code)
+
+ outgoing_responses_counter.labels(request.method, response_code).inc()
response_count.labels(request.method, self.name, tag).inc()
- response_timer.labels(request.method, self.name, tag).observe(
+ response_timer.labels(request.method, self.name, tag, response_code).observe(
time_sec - self.start
)
diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py
new file mode 100644
index 0000000000..9d820e44a6
--- /dev/null
+++ b/synapse/metrics/background_process_metrics.py
@@ -0,0 +1,179 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import six
+
+from prometheus_client.core import REGISTRY, Counter, GaugeMetricFamily
+
+from twisted.internet import defer
+
+from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
+
+_background_process_start_count = Counter(
+ "synapse_background_process_start_count",
+ "Number of background processes started",
+ ["name"],
+)
+
+# we set registry=None in all of these to stop them getting registered with
+# the default registry. Instead we collect them all via the CustomCollector,
+# which ensures that we can update them before they are collected.
+#
+_background_process_ru_utime = Counter(
+ "synapse_background_process_ru_utime_seconds",
+ "User CPU time used by background processes, in seconds",
+ ["name"],
+ registry=None,
+)
+
+_background_process_ru_stime = Counter(
+ "synapse_background_process_ru_stime_seconds",
+ "System CPU time used by background processes, in seconds",
+ ["name"],
+ registry=None,
+)
+
+_background_process_db_txn_count = Counter(
+ "synapse_background_process_db_txn_count",
+ "Number of database transactions done by background processes",
+ ["name"],
+ registry=None,
+)
+
+_background_process_db_txn_duration = Counter(
+ "synapse_background_process_db_txn_duration_seconds",
+ ("Seconds spent by background processes waiting for database "
+ "transactions, excluding scheduling time"),
+ ["name"],
+ registry=None,
+)
+
+_background_process_db_sched_duration = Counter(
+ "synapse_background_process_db_sched_duration_seconds",
+ "Seconds spent by background processes waiting for database connections",
+ ["name"],
+ registry=None,
+)
+
+# map from description to a counter, so that we can name our logcontexts
+# incrementally. (It actually duplicates _background_process_start_count, but
+# it's much simpler to do so than to try to combine them.)
+_background_process_counts = dict() # type: dict[str, int]
+
+# map from description to the currently running background processes.
+#
+# it's kept as a dict of sets rather than a big set so that we can keep track
+# of process descriptions that no longer have any active processes.
+_background_processes = dict() # type: dict[str, set[_BackgroundProcess]]
+
+
+class _Collector(object):
+ """A custom metrics collector for the background process metrics.
+
+ Ensures that all of the metrics are up-to-date with any in-flight processes
+ before they are returned.
+ """
+ def collect(self):
+ background_process_in_flight_count = GaugeMetricFamily(
+ "synapse_background_process_in_flight_count",
+ "Number of background processes in flight",
+ labels=["name"],
+ )
+
+ for desc, processes in six.iteritems(_background_processes):
+ background_process_in_flight_count.add_metric(
+ (desc,), len(processes),
+ )
+ for process in processes:
+ process.update_metrics()
+
+ yield background_process_in_flight_count
+
+ # now we need to run collect() over each of the static Counters, and
+ # yield each metric they return.
+ for m in (
+ _background_process_ru_utime,
+ _background_process_ru_stime,
+ _background_process_db_txn_count,
+ _background_process_db_txn_duration,
+ _background_process_db_sched_duration,
+ ):
+ for r in m.collect():
+ yield r
+
+
+REGISTRY.register(_Collector())
+
+
+class _BackgroundProcess(object):
+ def __init__(self, desc, ctx):
+ self.desc = desc
+ self._context = ctx
+ self._reported_stats = None
+
+ def update_metrics(self):
+ """Updates the metrics with values from this process."""
+ new_stats = self._context.get_resource_usage()
+ if self._reported_stats is None:
+ diff = new_stats
+ else:
+ diff = new_stats - self._reported_stats
+ self._reported_stats = new_stats
+
+ _background_process_ru_utime.labels(self.desc).inc(diff.ru_utime)
+ _background_process_ru_stime.labels(self.desc).inc(diff.ru_stime)
+ _background_process_db_txn_count.labels(self.desc).inc(
+ diff.db_txn_count,
+ )
+ _background_process_db_txn_duration.labels(self.desc).inc(
+ diff.db_txn_duration_sec,
+ )
+ _background_process_db_sched_duration.labels(self.desc).inc(
+ diff.db_sched_duration_sec,
+ )
+
+
+def run_as_background_process(desc, func, *args, **kwargs):
+ """Run the given function in its own logcontext, with resource metrics
+
+ This should be used to wrap processes which are fired off to run in the
+ background, instead of being associated with a particular request.
+
+ Args:
+ desc (str): a description for this background process type
+ func: a function, which may return a Deferred
+ args: positional args for func
+ kwargs: keyword args for func
+
+ Returns: None
+ """
+ @defer.inlineCallbacks
+ def run():
+ count = _background_process_counts.get(desc, 0)
+ _background_process_counts[desc] = count + 1
+ _background_process_start_count.labels(desc).inc()
+
+ with LoggingContext(desc) as context:
+ context.request = "%s-%i" % (desc, count)
+ proc = _BackgroundProcess(desc, context)
+ _background_processes.setdefault(desc, set()).add(proc)
+ try:
+ yield func(*args, **kwargs)
+ finally:
+ proc.update_metrics()
+ _background_processes[desc].remove(proc)
+
+ with PreserveLoggingContext():
+ run()
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 51cbd66f06..e650c3e494 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -274,7 +274,7 @@ class Notifier(object):
logger.exception("Error notifying application services of event")
def on_new_event(self, stream_key, new_token, users=[], rooms=[]):
- """ Used to inform listeners that something has happend event wise.
+ """ Used to inform listeners that something has happened event wise.
Will wake up all listeners for the given users and rooms.
"""
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index bb181d94ee..1d14d3639c 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -112,7 +112,8 @@ class BulkPushRuleEvaluator(object):
@defer.inlineCallbacks
def _get_power_levels_and_sender_level(self, event, context):
- pl_event_id = context.prev_state_ids.get(POWER_KEY)
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
+ pl_event_id = prev_state_ids.get(POWER_KEY)
if pl_event_id:
# fastpath: if there's a power level event, that's all we need, and
# not having a power level event is an extreme edge case
@@ -120,7 +121,7 @@ class BulkPushRuleEvaluator(object):
auth_events = {POWER_KEY: pl_event}
else:
auth_events_ids = yield self.auth.compute_auth_events(
- event, context.prev_state_ids, for_verification=False,
+ event, prev_state_ids, for_verification=False,
)
auth_events = yield self.store.get_events(auth_events_ids)
auth_events = {
@@ -304,7 +305,7 @@ class RulesForRoom(object):
push_rules_delta_state_cache_metric.inc_hits()
else:
- current_state_ids = context.current_state_ids
+ current_state_ids = yield context.get_current_state_ids(self.store)
push_rules_delta_state_cache_metric.inc_misses()
push_rules_state_size_counter.inc(len(current_state_ids))
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index 81e18bcf7d..a4e8cafdc9 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -331,7 +331,12 @@ class HttpPusher(object):
if not notification_dict:
defer.returnValue([])
try:
- resp = yield self.http_client.post_json_get_json(self.url, notification_dict)
+ url = self.url.replace(
+ "https://matrix.org/_matrix/push/v1/notify",
+ "http://http-priv.matrix.org/_matrix/push/v1/notify",
+ )
+
+ resp = yield self.http_client.post_json_get_json(url, notification_dict)
except Exception:
logger.warn(
"Failed to push event %s to %s",
diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py
index 2eede54792..5227bc333d 100644
--- a/synapse/replication/http/send_event.py
+++ b/synapse/replication/http/send_event.py
@@ -34,12 +34,13 @@ logger = logging.getLogger(__name__)
@defer.inlineCallbacks
-def send_event_to_master(clock, client, host, port, requester, event, context,
+def send_event_to_master(clock, store, client, host, port, requester, event, context,
ratelimit, extra_users):
"""Send event to be handled on the master
Args:
clock (synapse.util.Clock)
+ store (DataStore)
client (SimpleHttpClient)
host (str): host of master
port (int): port on master listening for HTTP replication
@@ -53,11 +54,13 @@ def send_event_to_master(clock, client, host, port, requester, event, context,
host, port, event.event_id,
)
+ serialized_context = yield context.serialize(event, store)
+
payload = {
"event": event.get_pdu_json(),
"internal_metadata": event.internal_metadata.get_dict(),
"rejected_reason": event.rejected_reason,
- "context": context.serialize(event),
+ "context": serialized_context,
"requester": requester.serialize(),
"ratelimit": ratelimit,
"extra_users": [u.to_string() for u in extra_users],
diff --git a/synapse/replication/slave/storage/client_ips.py b/synapse/replication/slave/storage/client_ips.py
index 60641f1a49..5b8521c770 100644
--- a/synapse/replication/slave/storage/client_ips.py
+++ b/synapse/replication/slave/storage/client_ips.py
@@ -43,6 +43,8 @@ class SlavedClientIpStore(BaseSlavedStore):
if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY:
return
+ self.client_ip_last_seen.prefill(key, now)
+
self.hs.get_tcp_replication().send_user_ip(
user_id, access_token, ip, user_agent, device_id, now
)
diff --git a/synapse/replication/tcp/streams.py b/synapse/replication/tcp/streams.py
index 55fe701c5c..ab65a6792e 100644
--- a/synapse/replication/tcp/streams.py
+++ b/synapse/replication/tcp/streams.py
@@ -32,7 +32,7 @@ from twisted.internet import defer
logger = logging.getLogger(__name__)
-MAX_EVENTS_BEHIND = 10000
+MAX_EVENTS_BEHIND = 500000
EventStreamRow = namedtuple("EventStreamRow", (
diff --git a/synapse/rest/__init__.py b/synapse/rest/__init__.py
index 75c2a4ec8e..3418f06fd6 100644
--- a/synapse/rest/__init__.py
+++ b/synapse/rest/__init__.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -13,13 +14,24 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from six import PY3
+
from synapse.http.server import JsonResource
from synapse.rest.client import versions
-from synapse.rest.client.v1 import admin, directory, events, initial_sync
-from synapse.rest.client.v1 import login as v1_login
-from synapse.rest.client.v1 import logout, presence, profile, push_rule, pusher
-from synapse.rest.client.v1 import register as v1_register
-from synapse.rest.client.v1 import room, voip
+from synapse.rest.client.v1 import (
+ admin,
+ directory,
+ events,
+ initial_sync,
+ login as v1_login,
+ logout,
+ presence,
+ profile,
+ push_rule,
+ pusher,
+ room,
+ voip,
+)
from synapse.rest.client.v2_alpha import (
account,
account_data,
@@ -42,6 +54,11 @@ from synapse.rest.client.v2_alpha import (
user_directory,
)
+if not PY3:
+ from synapse.rest.client.v1_only import (
+ register as v1_register,
+ )
+
class ClientRestResource(JsonResource):
"""A resource for version 1 of the matrix client API."""
@@ -54,14 +71,22 @@ class ClientRestResource(JsonResource):
def register_servlets(client_resource, hs):
versions.register_servlets(client_resource)
- # "v1"
- room.register_servlets(hs, client_resource)
+ if not PY3:
+ # "v1" (Python 2 only)
+ v1_register.register_servlets(hs, client_resource)
+
+ # Deprecated in r0
+ initial_sync.register_servlets(hs, client_resource)
+ room.register_deprecated_servlets(hs, client_resource)
+
+ # Partially deprecated in r0
events.register_servlets(hs, client_resource)
- v1_register.register_servlets(hs, client_resource)
+
+ # "v1" + "r0"
+ room.register_servlets(hs, client_resource)
v1_login.register_servlets(hs, client_resource)
profile.register_servlets(hs, client_resource)
presence.register_servlets(hs, client_resource)
- initial_sync.register_servlets(hs, client_resource)
directory.register_servlets(hs, client_resource)
voip.register_servlets(hs, client_resource)
admin.register_servlets(hs, client_resource)
diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py
index 2dc50e582b..6d276a109a 100644
--- a/synapse/rest/client/v1/admin.py
+++ b/synapse/rest/client/v1/admin.py
@@ -14,6 +14,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import hashlib
+import hmac
import logging
from six.moves import http_client
@@ -63,6 +65,125 @@ class UsersRestServlet(ClientV1RestServlet):
defer.returnValue((200, ret))
+class UserRegisterServlet(ClientV1RestServlet):
+ """
+ Attributes:
+ NONCE_TIMEOUT (int): Seconds until a generated nonce won't be accepted
+ nonces (dict[str, int]): The nonces that we will accept. A dict of
+ nonce to the time it was generated, in int seconds.
+ """
+ PATTERNS = client_path_patterns("/admin/register")
+ NONCE_TIMEOUT = 60
+
+ def __init__(self, hs):
+ super(UserRegisterServlet, self).__init__(hs)
+ self.handlers = hs.get_handlers()
+ self.reactor = hs.get_reactor()
+ self.nonces = {}
+ self.hs = hs
+
+ def _clear_old_nonces(self):
+ """
+ Clear out old nonces that are older than NONCE_TIMEOUT.
+ """
+ now = int(self.reactor.seconds())
+
+ for k, v in list(self.nonces.items()):
+ if now - v > self.NONCE_TIMEOUT:
+ del self.nonces[k]
+
+ def on_GET(self, request):
+ """
+ Generate a new nonce.
+ """
+ self._clear_old_nonces()
+
+ nonce = self.hs.get_secrets().token_hex(64)
+ self.nonces[nonce] = int(self.reactor.seconds())
+ return (200, {"nonce": nonce.encode('ascii')})
+
+ @defer.inlineCallbacks
+ def on_POST(self, request):
+ self._clear_old_nonces()
+
+ if not self.hs.config.registration_shared_secret:
+ raise SynapseError(400, "Shared secret registration is not enabled")
+
+ body = parse_json_object_from_request(request)
+
+ if "nonce" not in body:
+ raise SynapseError(
+ 400, "nonce must be specified", errcode=Codes.BAD_JSON,
+ )
+
+ nonce = body["nonce"]
+
+ if nonce not in self.nonces:
+ raise SynapseError(
+ 400, "unrecognised nonce",
+ )
+
+ # Delete the nonce, so it can't be reused, even if it's invalid
+ del self.nonces[nonce]
+
+ if "username" not in body:
+ raise SynapseError(
+ 400, "username must be specified", errcode=Codes.BAD_JSON,
+ )
+ else:
+ if (not isinstance(body['username'], str) or len(body['username']) > 512):
+ raise SynapseError(400, "Invalid username")
+
+ username = body["username"].encode("utf-8")
+ if b"\x00" in username:
+ raise SynapseError(400, "Invalid username")
+
+ if "password" not in body:
+ raise SynapseError(
+ 400, "password must be specified", errcode=Codes.BAD_JSON,
+ )
+ else:
+ if (not isinstance(body['password'], str) or len(body['password']) > 512):
+ raise SynapseError(400, "Invalid password")
+
+ password = body["password"].encode("utf-8")
+ if b"\x00" in password:
+ raise SynapseError(400, "Invalid password")
+
+ admin = body.get("admin", None)
+ got_mac = body["mac"]
+
+ want_mac = hmac.new(
+ key=self.hs.config.registration_shared_secret.encode(),
+ digestmod=hashlib.sha1,
+ )
+ want_mac.update(nonce)
+ want_mac.update(b"\x00")
+ want_mac.update(username)
+ want_mac.update(b"\x00")
+ want_mac.update(password)
+ want_mac.update(b"\x00")
+ want_mac.update(b"admin" if admin else b"notadmin")
+ want_mac = want_mac.hexdigest()
+
+ if not hmac.compare_digest(want_mac, got_mac):
+ raise SynapseError(
+ 403, "HMAC incorrect",
+ )
+
+ # Reuse the parts of RegisterRestServlet to reduce code duplication
+ from synapse.rest.client.v2_alpha.register import RegisterRestServlet
+ register = RegisterRestServlet(self.hs)
+
+ (user_id, _) = yield register.registration_handler.register(
+ localpart=username.lower(), password=password, admin=bool(admin),
+ generate_token=False,
+ )
+
+ result = yield register._create_registration_details(user_id, body)
+ defer.returnValue((200, result))
+
+
class WhoisRestServlet(ClientV1RestServlet):
PATTERNS = client_path_patterns("/admin/whois/(?P<user_id>[^/]*)")
@@ -123,7 +244,7 @@ class PurgeHistoryRestServlet(ClientV1RestServlet):
hs (synapse.server.HomeServer)
"""
super(PurgeHistoryRestServlet, self).__init__(hs)
- self.handlers = hs.get_handlers()
+ self.pagination_handler = hs.get_pagination_handler()
self.store = hs.get_datastore()
@defer.inlineCallbacks
@@ -198,7 +319,7 @@ class PurgeHistoryRestServlet(ClientV1RestServlet):
errcode=Codes.BAD_JSON,
)
- purge_id = yield self.handlers.message_handler.start_purge_history(
+ purge_id = yield self.pagination_handler.start_purge_history(
room_id, token,
delete_local_events=delete_local_events,
)
@@ -220,7 +341,7 @@ class PurgeHistoryStatusRestServlet(ClientV1RestServlet):
hs (synapse.server.HomeServer)
"""
super(PurgeHistoryStatusRestServlet, self).__init__(hs)
- self.handlers = hs.get_handlers()
+ self.pagination_handler = hs.get_pagination_handler()
@defer.inlineCallbacks
def on_GET(self, request, purge_id):
@@ -230,7 +351,7 @@ class PurgeHistoryStatusRestServlet(ClientV1RestServlet):
if not is_admin:
raise AuthError(403, "You are not a server admin")
- purge_status = self.handlers.message_handler.get_purge_status(purge_id)
+ purge_status = self.pagination_handler.get_purge_status(purge_id)
if purge_status is None:
raise NotFoundError("purge id '%s' not found" % purge_id)
@@ -318,17 +439,6 @@ class ShutdownRoomRestServlet(ClientV1RestServlet):
)
new_room_id = info["room_id"]
- yield self.event_creation_handler.create_and_send_nonmember_event(
- room_creator_requester,
- {
- "type": "m.room.message",
- "content": {"body": message, "msgtype": "m.text"},
- "room_id": new_room_id,
- "sender": new_room_user_id,
- },
- ratelimit=False,
- )
-
requester_user_id = requester.user.to_string()
logger.info("Shutting down room %r", room_id)
@@ -366,6 +476,17 @@ class ShutdownRoomRestServlet(ClientV1RestServlet):
kicked_users.append(user_id)
+ yield self.event_creation_handler.create_and_send_nonmember_event(
+ room_creator_requester,
+ {
+ "type": "m.room.message",
+ "content": {"body": message, "msgtype": "m.text"},
+ "room_id": new_room_id,
+ "sender": new_room_user_id,
+ },
+ ratelimit=False,
+ )
+
aliases_for_room = yield self.store.get_aliases_for_room(room_id)
yield self.store.update_aliases_for_room(
@@ -614,3 +735,4 @@ def register_servlets(hs, http_server):
ShutdownRoomRestServlet(hs).register(http_server)
QuarantineMediaInRoom(hs).register(http_server)
ListMediaInRoom(hs).register(http_server)
+ UserRegisterServlet(hs).register(http_server)
diff --git a/synapse/rest/client/v1/presence.py b/synapse/rest/client/v1/presence.py
index a14f0c807e..cb61bfcb21 100644
--- a/synapse/rest/client/v1/presence.py
+++ b/synapse/rest/client/v1/presence.py
@@ -84,7 +84,7 @@ class PresenceStatusRestServlet(ClientV1RestServlet):
except Exception:
raise SynapseError(400, "Unable to parse state")
- yield self.presence_handler.set_state(user, state)
+ # yield self.presence_handler.set_state(user, state)
defer.returnValue((200, {}))
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index 2a679ac830..ee1feeec69 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -90,6 +90,7 @@ class RoomStateEventRestServlet(ClientV1RestServlet):
self.handlers = hs.get_handlers()
self.event_creation_hander = hs.get_event_creation_handler()
self.room_member_handler = hs.get_room_member_handler()
+ self.message_handler = hs.get_message_handler()
def register(self, http_server):
# /room/$roomid/state/$eventtype
@@ -124,7 +125,7 @@ class RoomStateEventRestServlet(ClientV1RestServlet):
format = parse_string(request, "format", default="content",
allowed_values=["content", "event"])
- msg_handler = self.handlers.message_handler
+ msg_handler = self.message_handler
data = yield msg_handler.get_room_data(
user_id=requester.user.to_string(),
room_id=room_id,
@@ -377,14 +378,13 @@ class RoomMemberListRestServlet(ClientV1RestServlet):
def __init__(self, hs):
super(RoomMemberListRestServlet, self).__init__(hs)
- self.handlers = hs.get_handlers()
+ self.message_handler = hs.get_message_handler()
@defer.inlineCallbacks
def on_GET(self, request, room_id):
# TODO support Pagination stream API (limit/tokens)
requester = yield self.auth.get_user_by_req(request)
- handler = self.handlers.message_handler
- events = yield handler.get_state_events(
+ events = yield self.message_handler.get_state_events(
room_id=room_id,
user_id=requester.user.to_string(),
)
@@ -406,7 +406,7 @@ class JoinedRoomMemberListRestServlet(ClientV1RestServlet):
def __init__(self, hs):
super(JoinedRoomMemberListRestServlet, self).__init__(hs)
- self.message_handler = hs.get_handlers().message_handler
+ self.message_handler = hs.get_message_handler()
@defer.inlineCallbacks
def on_GET(self, request, room_id):
@@ -427,7 +427,7 @@ class RoomMessageListRestServlet(ClientV1RestServlet):
def __init__(self, hs):
super(RoomMessageListRestServlet, self).__init__(hs)
- self.handlers = hs.get_handlers()
+ self.pagination_handler = hs.get_pagination_handler()
@defer.inlineCallbacks
def on_GET(self, request, room_id):
@@ -442,8 +442,7 @@ class RoomMessageListRestServlet(ClientV1RestServlet):
event_filter = Filter(json.loads(filter_json))
else:
event_filter = None
- handler = self.handlers.message_handler
- msgs = yield handler.get_messages(
+ msgs = yield self.pagination_handler.get_messages(
room_id=room_id,
requester=requester,
pagin_config=pagination_config,
@@ -460,14 +459,13 @@ class RoomStateRestServlet(ClientV1RestServlet):
def __init__(self, hs):
super(RoomStateRestServlet, self).__init__(hs)
- self.handlers = hs.get_handlers()
+ self.message_handler = hs.get_message_handler()
@defer.inlineCallbacks
def on_GET(self, request, room_id):
requester = yield self.auth.get_user_by_req(request, allow_guest=True)
- handler = self.handlers.message_handler
# Get all the current state for this room
- events = yield handler.get_state_events(
+ events = yield self.message_handler.get_state_events(
room_id=room_id,
user_id=requester.user.to_string(),
is_guest=requester.is_guest,
@@ -525,7 +523,7 @@ class RoomEventContextServlet(ClientV1RestServlet):
def __init__(self, hs):
super(RoomEventContextServlet, self).__init__(hs)
self.clock = hs.get_clock()
- self.handlers = hs.get_handlers()
+ self.room_context_handler = hs.get_room_context_handler()
@defer.inlineCallbacks
def on_GET(self, request, room_id, event_id):
@@ -533,7 +531,7 @@ class RoomEventContextServlet(ClientV1RestServlet):
limit = parse_integer(request, "limit", default=10)
- results = yield self.handlers.room_context_handler.get_event_context(
+ results = yield self.room_context_handler.get_event_context(
requester.user,
room_id,
event_id,
@@ -832,10 +830,13 @@ def register_servlets(hs, http_server):
RoomSendEventRestServlet(hs).register(http_server)
PublicRoomListRestServlet(hs).register(http_server)
RoomStateRestServlet(hs).register(http_server)
- RoomInitialSyncRestServlet(hs).register(http_server)
RoomRedactEventRestServlet(hs).register(http_server)
RoomTypingRestServlet(hs).register(http_server)
SearchRestServlet(hs).register(http_server)
JoinedRoomsRestServlet(hs).register(http_server)
RoomEventServlet(hs).register(http_server)
RoomEventContextServlet(hs).register(http_server)
+
+
+def register_deprecated_servlets(hs, http_server):
+ RoomInitialSyncRestServlet(hs).register(http_server)
diff --git a/synapse/rest/client/v1_only/__init__.py b/synapse/rest/client/v1_only/__init__.py
new file mode 100644
index 0000000000..936f902ace
--- /dev/null
+++ b/synapse/rest/client/v1_only/__init__.py
@@ -0,0 +1,3 @@
+"""
+REST APIs that are only used in v1 (the legacy API).
+"""
diff --git a/synapse/rest/client/v1_only/base.py b/synapse/rest/client/v1_only/base.py
new file mode 100644
index 0000000000..9d4db7437c
--- /dev/null
+++ b/synapse/rest/client/v1_only/base.py
@@ -0,0 +1,39 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""This module contains base REST classes for constructing client v1 servlets.
+"""
+
+import re
+
+from synapse.api.urls import CLIENT_PREFIX
+
+
+def v1_only_client_path_patterns(path_regex, include_in_unstable=True):
+ """Creates a regex compiled client path with the correct client path
+ prefix.
+
+ Args:
+ path_regex (str): The regex string to match. This should NOT have a ^
+ as this will be prefixed.
+ Returns:
+ list of SRE_Pattern
+ """
+ patterns = [re.compile("^" + CLIENT_PREFIX + path_regex)]
+ if include_in_unstable:
+ unstable_prefix = CLIENT_PREFIX.replace("/api/v1", "/unstable")
+ patterns.append(re.compile("^" + unstable_prefix + path_regex))
+ return patterns
diff --git a/synapse/rest/client/v1/register.py b/synapse/rest/client/v1_only/register.py
index 25a143af8d..3439c3c6d4 100644
--- a/synapse/rest/client/v1/register.py
+++ b/synapse/rest/client/v1_only/register.py
@@ -24,9 +24,10 @@ import synapse.util.stringutils as stringutils
from synapse.api.constants import LoginType
from synapse.api.errors import Codes, SynapseError
from synapse.http.servlet import assert_params_in_dict, parse_json_object_from_request
+from synapse.rest.client.v1.base import ClientV1RestServlet
from synapse.types import create_requester
-from .base import ClientV1RestServlet, client_path_patterns
+from .base import v1_only_client_path_patterns
logger = logging.getLogger(__name__)
@@ -49,7 +50,7 @@ class RegisterRestServlet(ClientV1RestServlet):
handler doesn't have a concept of multi-stages or sessions.
"""
- PATTERNS = client_path_patterns("/register$", releases=(), include_in_unstable=False)
+ PATTERNS = v1_only_client_path_patterns("/register$", include_in_unstable=False)
def __init__(self, hs):
"""
@@ -379,7 +380,7 @@ class CreateUserRestServlet(ClientV1RestServlet):
"""Handles user creation via a server-to-server interface
"""
- PATTERNS = client_path_patterns("/createUser$", releases=())
+ PATTERNS = v1_only_client_path_patterns("/createUser$")
def __init__(self, hs):
super(CreateUserRestServlet, self).__init__(hs)
diff --git a/synapse/secrets.py b/synapse/secrets.py
new file mode 100644
index 0000000000..f397daaa5e
--- /dev/null
+++ b/synapse/secrets.py
@@ -0,0 +1,42 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Injectable secrets module for Synapse.
+
+See https://docs.python.org/3/library/secrets.html#module-secrets for the API
+used in Python 3.6, and the API emulated in Python 2.7.
+"""
+
+import six
+
+if six.PY3:
+ import secrets
+
+ def Secrets():
+ return secrets
+
+
+else:
+
+ import os
+ import binascii
+
+ class Secrets(object):
+ def token_bytes(self, nbytes=32):
+ return os.urandom(nbytes)
+
+ def token_hex(self, nbytes=32):
+ return binascii.hexlify(self.token_bytes(nbytes))
diff --git a/synapse/server.py b/synapse/server.py
index 92bea96c5c..140be9ebe8 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -52,12 +52,13 @@ from synapse.handlers.e2e_keys import E2eKeysHandler
from synapse.handlers.events import EventHandler, EventStreamHandler
from synapse.handlers.groups_local import GroupsLocalHandler
from synapse.handlers.initial_sync import InitialSyncHandler
-from synapse.handlers.message import EventCreationHandler
+from synapse.handlers.message import EventCreationHandler, MessageHandler
+from synapse.handlers.pagination import PaginationHandler
from synapse.handlers.presence import PresenceHandler
from synapse.handlers.profile import ProfileHandler
from synapse.handlers.read_marker import ReadMarkerHandler
from synapse.handlers.receipts import ReceiptsHandler
-from synapse.handlers.room import RoomCreationHandler
+from synapse.handlers.room import RoomContextHandler, RoomCreationHandler
from synapse.handlers.room_list import RoomListHandler
from synapse.handlers.room_member import RoomMemberMasterHandler
from synapse.handlers.room_member_worker import RoomMemberWorkerHandler
@@ -74,6 +75,7 @@ from synapse.rest.media.v1.media_repository import (
MediaRepository,
MediaRepositoryResource,
)
+from synapse.secrets import Secrets
from synapse.server_notices.server_notices_manager import ServerNoticesManager
from synapse.server_notices.server_notices_sender import ServerNoticesSender
from synapse.server_notices.worker_server_notices_sender import WorkerServerNoticesSender
@@ -158,11 +160,15 @@ class HomeServer(object):
'groups_server_handler',
'groups_attestation_signing',
'groups_attestation_renewer',
+ 'secrets',
'spam_checker',
'room_member_handler',
'federation_registry',
'server_notices_manager',
'server_notices_sender',
+ 'message_handler',
+ 'pagination_handler',
+ 'room_context_handler',
]
def __init__(self, hostname, reactor=None, **kwargs):
@@ -405,6 +411,9 @@ class HomeServer(object):
def build_groups_attestation_renewer(self):
return GroupAttestionRenewer(self)
+ def build_secrets(self):
+ return Secrets()
+
def build_spam_checker(self):
return SpamChecker(self)
@@ -426,6 +435,15 @@ class HomeServer(object):
return WorkerServerNoticesSender(self)
return ServerNoticesSender(self)
+ def build_message_handler(self):
+ return MessageHandler(self)
+
+ def build_pagination_handler(self):
+ return PaginationHandler(self)
+
+ def build_room_context_handler(self):
+ return RoomContextHandler(self)
+
def remove_pusher(self, app_id, push_key, user_id):
return self.get_pusherpool().remove_pusher(app_id, push_key, user_id)
diff --git a/synapse/state.py b/synapse/state.py
index 15a593d41c..033f55d967 100644
--- a/synapse/state.py
+++ b/synapse/state.py
@@ -18,7 +18,7 @@ import hashlib
import logging
from collections import namedtuple
-from six import iteritems, itervalues
+from six import iteritems, iterkeys, itervalues
from frozendict import frozendict
@@ -203,25 +203,27 @@ class StateHandler(object):
# If this is an outlier, then we know it shouldn't have any current
# state. Certainly store.get_current_state won't return any, and
# persisting the event won't store the state group.
- context = EventContext()
if old_state:
- context.prev_state_ids = {
+ prev_state_ids = {
(s.type, s.state_key): s.event_id for s in old_state
}
if event.is_state():
- context.current_state_ids = dict(context.prev_state_ids)
+ current_state_ids = dict(prev_state_ids)
key = (event.type, event.state_key)
- context.current_state_ids[key] = event.event_id
+ current_state_ids[key] = event.event_id
else:
- context.current_state_ids = context.prev_state_ids
+ current_state_ids = prev_state_ids
else:
- context.current_state_ids = {}
- context.prev_state_ids = {}
- context.prev_state_events = []
+ current_state_ids = {}
+ prev_state_ids = {}
# We don't store state for outliers, so we don't generate a state
- # froup for it.
- context.state_group = None
+ # group for it.
+ context = EventContext.with_state(
+ state_group=None,
+ current_state_ids=current_state_ids,
+ prev_state_ids=prev_state_ids,
+ )
defer.returnValue(context)
@@ -230,31 +232,35 @@ class StateHandler(object):
# Let's just correctly fill out the context and create a
# new state group for it.
- context = EventContext()
- context.prev_state_ids = {
+ prev_state_ids = {
(s.type, s.state_key): s.event_id for s in old_state
}
if event.is_state():
key = (event.type, event.state_key)
- if key in context.prev_state_ids:
- replaces = context.prev_state_ids[key]
+ if key in prev_state_ids:
+ replaces = prev_state_ids[key]
if replaces != event.event_id: # Paranoia check
event.unsigned["replaces_state"] = replaces
- context.current_state_ids = dict(context.prev_state_ids)
- context.current_state_ids[key] = event.event_id
+ current_state_ids = dict(prev_state_ids)
+ current_state_ids[key] = event.event_id
else:
- context.current_state_ids = context.prev_state_ids
+ current_state_ids = prev_state_ids
- context.state_group = yield self.store.store_state_group(
+ state_group = yield self.store.store_state_group(
event.event_id,
event.room_id,
prev_group=None,
delta_ids=None,
- current_state_ids=context.current_state_ids,
+ current_state_ids=current_state_ids,
+ )
+
+ context = EventContext.with_state(
+ state_group=state_group,
+ current_state_ids=current_state_ids,
+ prev_state_ids=prev_state_ids,
)
- context.prev_state_events = []
defer.returnValue(context)
logger.debug("calling resolve_state_groups from compute_event_context")
@@ -262,47 +268,47 @@ class StateHandler(object):
event.room_id, [e for e, _ in event.prev_events],
)
- curr_state = entry.state
+ prev_state_ids = entry.state
+ prev_group = None
+ delta_ids = None
- context = EventContext()
- context.prev_state_ids = curr_state
if event.is_state():
# If this is a state event then we need to create a new state
# group for the state after this event.
key = (event.type, event.state_key)
- if key in context.prev_state_ids:
- replaces = context.prev_state_ids[key]
+ if key in prev_state_ids:
+ replaces = prev_state_ids[key]
event.unsigned["replaces_state"] = replaces
- context.current_state_ids = dict(context.prev_state_ids)
- context.current_state_ids[key] = event.event_id
+ current_state_ids = dict(prev_state_ids)
+ current_state_ids[key] = event.event_id
if entry.state_group:
# If the state at the event has a state group assigned then
# we can use that as the prev group
- context.prev_group = entry.state_group
- context.delta_ids = {
+ prev_group = entry.state_group
+ delta_ids = {
key: event.event_id
}
elif entry.prev_group:
# If the state at the event only has a prev group, then we can
# use that as a prev group too.
- context.prev_group = entry.prev_group
- context.delta_ids = dict(entry.delta_ids)
- context.delta_ids[key] = event.event_id
+ prev_group = entry.prev_group
+ delta_ids = dict(entry.delta_ids)
+ delta_ids[key] = event.event_id
- context.state_group = yield self.store.store_state_group(
+ state_group = yield self.store.store_state_group(
event.event_id,
event.room_id,
- prev_group=context.prev_group,
- delta_ids=context.delta_ids,
- current_state_ids=context.current_state_ids,
+ prev_group=prev_group,
+ delta_ids=delta_ids,
+ current_state_ids=current_state_ids,
)
else:
- context.current_state_ids = context.prev_state_ids
- context.prev_group = entry.prev_group
- context.delta_ids = entry.delta_ids
+ current_state_ids = prev_state_ids
+ prev_group = entry.prev_group
+ delta_ids = entry.delta_ids
if entry.state_group is None:
entry.state_group = yield self.store.store_state_group(
@@ -310,13 +316,20 @@ class StateHandler(object):
event.room_id,
prev_group=entry.prev_group,
delta_ids=entry.delta_ids,
- current_state_ids=context.current_state_ids,
+ current_state_ids=current_state_ids,
)
entry.state_id = entry.state_group
- context.state_group = entry.state_group
+ state_group = entry.state_group
+
+ context = EventContext.with_state(
+ state_group=state_group,
+ current_state_ids=current_state_ids,
+ prev_state_ids=prev_state_ids,
+ prev_group=prev_group,
+ delta_ids=delta_ids,
+ )
- context.prev_state_events = []
defer.returnValue(context)
@defer.inlineCallbacks
@@ -458,69 +471,39 @@ class StateResolutionHandler(object):
"Resolving state for %s with %d groups", room_id, len(state_groups_ids)
)
- # build a map from state key to the event_ids which set that state.
- # dict[(str, str), set[str])
- state = {}
+ # start by assuming we won't have any conflicted state, and build up the new
+ # state map by iterating through the state groups. If we discover a conflict,
+ # we give up and instead use `resolve_events_with_factory`.
+ #
+ # XXX: is this actually worthwhile, or should we just let
+ # resolve_events_with_factory do it?
+ new_state = {}
+ conflicted_state = False
for st in itervalues(state_groups_ids):
for key, e_id in iteritems(st):
- state.setdefault(key, set()).add(e_id)
-
- # build a map from state key to the event_ids which set that state,
- # including only those where there are state keys in conflict.
- conflicted_state = {
- k: list(v)
- for k, v in iteritems(state)
- if len(v) > 1
- }
+ if key in new_state:
+ conflicted_state = True
+ break
+ new_state[key] = e_id
+ if conflicted_state:
+ break
if conflicted_state:
logger.info("Resolving conflicted state for %r", room_id)
with Measure(self.clock, "state._resolve_events"):
new_state = yield resolve_events_with_factory(
- list(state_groups_ids.values()),
+ list(itervalues(state_groups_ids)),
event_map=event_map,
state_map_factory=state_map_factory,
)
- else:
- new_state = {
- key: e_ids.pop() for key, e_ids in iteritems(state)
- }
- with Measure(self.clock, "state.create_group_ids"):
- # if the new state matches any of the input state groups, we can
- # use that state group again. Otherwise we will generate a state_id
- # which will be used as a cache key for future resolutions, but
- # not get persisted.
- state_group = None
- new_state_event_ids = frozenset(itervalues(new_state))
- for sg, events in iteritems(state_groups_ids):
- if new_state_event_ids == frozenset(e_id for e_id in events):
- state_group = sg
- break
+ # if the new state matches any of the input state groups, we can
+ # use that state group again. Otherwise we will generate a state_id
+ # which will be used as a cache key for future resolutions, but
+ # not get persisted.
- # TODO: We want to create a state group for this set of events, to
- # increase cache hits, but we need to make sure that it doesn't
- # end up as a prev_group without being added to the database
-
- prev_group = None
- delta_ids = None
- for old_group, old_ids in iteritems(state_groups_ids):
- if not set(new_state) - set(old_ids):
- n_delta_ids = {
- k: v
- for k, v in iteritems(new_state)
- if old_ids.get(k) != v
- }
- if not delta_ids or len(n_delta_ids) < len(delta_ids):
- prev_group = old_group
- delta_ids = n_delta_ids
-
- cache = _StateCacheEntry(
- state=new_state,
- state_group=state_group,
- prev_group=prev_group,
- delta_ids=delta_ids,
- )
+ with Measure(self.clock, "state.create_group_ids"):
+ cache = _make_state_cache_entry(new_state, state_groups_ids)
if self._state_cache is not None:
self._state_cache[group_names] = cache
@@ -528,6 +511,70 @@ class StateResolutionHandler(object):
defer.returnValue(cache)
+def _make_state_cache_entry(
+ new_state,
+ state_groups_ids,
+):
+ """Given a resolved state, and a set of input state groups, pick one to base
+ a new state group on (if any), and return an appropriately-constructed
+ _StateCacheEntry.
+
+ Args:
+ new_state (dict[(str, str), str]): resolved state map (mapping from
+ (type, state_key) to event_id)
+
+ state_groups_ids (dict[int, dict[(str, str), str]]):
+ map from state group id to the state in that state group
+ (where 'state' is a map from state key to event id)
+
+ Returns:
+ _StateCacheEntry
+ """
+ # if the new state matches any of the input state groups, we can
+ # use that state group again. Otherwise we will generate a state_id
+ # which will be used as a cache key for future resolutions, but
+ # not get persisted.
+
+ # first look for exact matches
+ new_state_event_ids = set(itervalues(new_state))
+ for sg, state in iteritems(state_groups_ids):
+ if len(new_state_event_ids) != len(state):
+ continue
+
+ old_state_event_ids = set(itervalues(state))
+ if new_state_event_ids == old_state_event_ids:
+ # got an exact match.
+ return _StateCacheEntry(
+ state=new_state,
+ state_group=sg,
+ )
+
+ # TODO: We want to create a state group for this set of events, to
+ # increase cache hits, but we need to make sure that it doesn't
+ # end up as a prev_group without being added to the database
+
+ # failing that, look for the closest match.
+ prev_group = None
+ delta_ids = None
+
+ for old_group, old_state in iteritems(state_groups_ids):
+ n_delta_ids = {
+ k: v
+ for k, v in iteritems(new_state)
+ if old_state.get(k) != v
+ }
+ if not delta_ids or len(n_delta_ids) < len(delta_ids):
+ prev_group = old_group
+ delta_ids = n_delta_ids
+
+ return _StateCacheEntry(
+ state=new_state,
+ state_group=None,
+ prev_group=prev_group,
+ delta_ids=delta_ids,
+ )
+
+
def _ordered_events(events):
def key_func(e):
return -int(e.depth), hashlib.sha1(e.event_id.encode()).hexdigest()
@@ -569,7 +616,7 @@ def _seperate(state_sets):
with them in different state sets.
Args:
- state_sets(list[dict[(str, str), str]]):
+ state_sets(iterable[dict[(str, str), str]]):
List of dicts of (type, state_key) -> event_id, which are the
different state groups to resolve.
@@ -583,10 +630,11 @@ def _seperate(state_sets):
conflicted_state is a dict mapping (type, state_key) to a set of
event ids for conflicted state keys.
"""
- unconflicted_state = dict(state_sets[0])
+ state_set_iterator = iter(state_sets)
+ unconflicted_state = dict(next(state_set_iterator))
conflicted_state = {}
- for state_set in state_sets[1:]:
+ for state_set in state_set_iterator:
for key, value in iteritems(state_set):
# Check if there is an unconflicted entry for the state key.
unconflicted_value = unconflicted_state.get(key)
@@ -647,7 +695,7 @@ def resolve_events_with_factory(state_sets, event_map, state_map_factory):
for event_id in event_ids
)
if event_map is not None:
- needed_events -= set(event_map.iterkeys())
+ needed_events -= set(iterkeys(event_map))
logger.info("Asking for %d conflicted events", len(needed_events))
@@ -668,7 +716,7 @@ def resolve_events_with_factory(state_sets, event_map, state_map_factory):
new_needed_events = set(itervalues(auth_events))
new_needed_events -= needed_events
if event_map is not None:
- new_needed_events -= set(event_map.iterkeys())
+ new_needed_events -= set(iterkeys(event_map))
logger.info("Asking for %d auth events", len(new_needed_events))
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index a6a0e6ec9f..1d41d8d445 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -343,10 +343,9 @@ class SQLBaseStore(object):
"""
parent_context = LoggingContext.current_context()
if parent_context == LoggingContext.sentinel:
- # warning disabled for 0.33.0 release; proper fixes will land imminently.
- # logger.warn(
- # "Running db txn from sentinel context: metrics will be lost",
- # )
+ logger.warn(
+ "Starting db connection from sentinel context: metrics will be lost",
+ )
parent_context = None
start_time = time.time()
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index dc9eca7d15..5fe1ca2de7 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -19,6 +19,8 @@ from canonicaljson import json
from twisted.internet import defer
+from synapse.metrics.background_process_metrics import run_as_background_process
+
from . import engines
from ._base import SQLBaseStore
@@ -87,10 +89,14 @@ class BackgroundUpdateStore(SQLBaseStore):
self._background_update_handlers = {}
self._all_done = False
- @defer.inlineCallbacks
def start_doing_background_updates(self):
- logger.info("Starting background schema updates")
+ run_as_background_process(
+ "background_updates", self._run_background_updates,
+ )
+ @defer.inlineCallbacks
+ def _run_background_updates(self):
+ logger.info("Starting background schema updates")
while True:
yield self.hs.get_clock().sleep(
self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.)
diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py
index b78eda3413..cf796242b8 100644
--- a/synapse/storage/client_ips.py
+++ b/synapse/storage/client_ips.py
@@ -19,6 +19,7 @@ from six import iteritems
from twisted.internet import defer
+from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.caches import CACHE_SIZE_FACTOR
from . import background_updates
@@ -29,7 +30,7 @@ logger = logging.getLogger(__name__)
# Number of msec of granularity to store the user IP 'last seen' time. Smaller
# times give more inserts into the database even for readonly API hits
# 120 seconds == 2 minutes
-LAST_SEEN_GRANULARITY = 120 * 1000
+LAST_SEEN_GRANULARITY = 10 * 60 * 1000
class ClientIpStore(background_updates.BackgroundUpdateStore):
@@ -93,10 +94,16 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
self._batch_row_update[key] = (user_agent, device_id, now)
def _update_client_ips_batch(self):
- to_update = self._batch_row_update
- self._batch_row_update = {}
- return self.runInteraction(
- "_update_client_ips_batch", self._update_client_ips_batch_txn, to_update
+ def update():
+ to_update = self._batch_row_update
+ self._batch_row_update = {}
+ return self.runInteraction(
+ "_update_client_ips_batch", self._update_client_ips_batch_txn,
+ to_update,
+ )
+
+ run_as_background_process(
+ "update_client_ips", update,
)
def _update_client_ips_batch_txn(self, txn, to_update):
diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
index ec68e39f1e..cc3cdf2ebc 100644
--- a/synapse/storage/devices.py
+++ b/synapse/storage/devices.py
@@ -248,17 +248,31 @@ class DeviceStore(SQLBaseStore):
def _update_remote_device_list_cache_entry_txn(self, txn, user_id, device_id,
content, stream_id):
- self._simple_upsert_txn(
- txn,
- table="device_lists_remote_cache",
- keyvalues={
- "user_id": user_id,
- "device_id": device_id,
- },
- values={
- "content": json.dumps(content),
- }
- )
+ if content.get("deleted"):
+ self._simple_delete_txn(
+ txn,
+ table="device_lists_remote_cache",
+ keyvalues={
+ "user_id": user_id,
+ "device_id": device_id,
+ },
+ )
+
+ txn.call_after(
+ self.device_id_exists_cache.invalidate, (user_id, device_id,)
+ )
+ else:
+ self._simple_upsert_txn(
+ txn,
+ table="device_lists_remote_cache",
+ keyvalues={
+ "user_id": user_id,
+ "device_id": device_id,
+ },
+ values={
+ "content": json.dumps(content),
+ }
+ )
txn.call_after(self._get_cached_user_device.invalidate, (user_id, device_id,))
txn.call_after(self._get_cached_devices_for_user.invalidate, (user_id,))
@@ -366,7 +380,7 @@ class DeviceStore(SQLBaseStore):
now_stream_id = max(stream_id for stream_id in itervalues(query_map))
devices = self._get_e2e_device_keys_txn(
- txn, query_map.keys(), include_all_devices=True
+ txn, query_map.keys(), include_all_devices=True, include_deleted_devices=True
)
prev_sent_id_sql = """
@@ -393,12 +407,15 @@ class DeviceStore(SQLBaseStore):
prev_id = stream_id
- key_json = device.get("key_json", None)
- if key_json:
- result["keys"] = json.loads(key_json)
- device_display_name = device.get("device_display_name", None)
- if device_display_name:
- result["device_display_name"] = device_display_name
+ if device is not None:
+ key_json = device.get("key_json", None)
+ if key_json:
+ result["keys"] = json.loads(key_json)
+ device_display_name = device.get("device_display_name", None)
+ if device_display_name:
+ result["device_display_name"] = device_display_name
+ else:
+ result["deleted"] = True
results.append(result)
diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py
index 7ae5c65482..523b4360c3 100644
--- a/synapse/storage/end_to_end_keys.py
+++ b/synapse/storage/end_to_end_keys.py
@@ -64,12 +64,18 @@ class EndToEndKeyStore(SQLBaseStore):
)
@defer.inlineCallbacks
- def get_e2e_device_keys(self, query_list, include_all_devices=False):
+ def get_e2e_device_keys(
+ self, query_list, include_all_devices=False,
+ include_deleted_devices=False,
+ ):
"""Fetch a list of device keys.
Args:
query_list(list): List of pairs of user_ids and device_ids.
include_all_devices (bool): whether to include entries for devices
that don't have device keys
+ include_deleted_devices (bool): whether to include null entries for
+ devices which no longer exist (but were in the query_list).
+ This option only takes effect if include_all_devices is true.
Returns:
Dict mapping from user-id to dict mapping from device_id to
dict containing "key_json", "device_display_name".
@@ -79,7 +85,7 @@ class EndToEndKeyStore(SQLBaseStore):
results = yield self.runInteraction(
"get_e2e_device_keys", self._get_e2e_device_keys_txn,
- query_list, include_all_devices,
+ query_list, include_all_devices, include_deleted_devices,
)
for user_id, device_keys in iteritems(results):
@@ -88,10 +94,19 @@ class EndToEndKeyStore(SQLBaseStore):
defer.returnValue(results)
- def _get_e2e_device_keys_txn(self, txn, query_list, include_all_devices):
+ def _get_e2e_device_keys_txn(
+ self, txn, query_list, include_all_devices=False,
+ include_deleted_devices=False,
+ ):
query_clauses = []
query_params = []
+ if include_all_devices is False:
+ include_deleted_devices = False
+
+ if include_deleted_devices:
+ deleted_devices = set(query_list)
+
for (user_id, device_id) in query_list:
query_clause = "user_id = ?"
query_params.append(user_id)
@@ -119,8 +134,14 @@ class EndToEndKeyStore(SQLBaseStore):
result = {}
for row in rows:
+ if include_deleted_devices:
+ deleted_devices.remove((row["user_id"], row["device_id"]))
result.setdefault(row["user_id"], {})[row["device_id"]] = row
+ if include_deleted_devices:
+ for user_id, device_id in deleted_devices:
+ result.setdefault(user_id, {})[device_id] = None
+
return result
@defer.inlineCallbacks
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 2aaab0d02c..906a405031 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -19,7 +19,7 @@ import logging
from collections import OrderedDict, deque, namedtuple
from functools import wraps
-from six import iteritems, itervalues
+from six import iteritems
from six.moves import range
from canonicaljson import json
@@ -33,6 +33,7 @@ from synapse.api.errors import SynapseError
# these are only included to make the type annotations work
from synapse.events import EventBase # noqa: F401
from synapse.events.snapshot import EventContext # noqa: F401
+from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.events_worker import EventsWorkerStore
from synapse.types import RoomStreamToken, get_domain_from_id
from synapse.util.async import ObservableDeferred
@@ -146,7 +147,8 @@ class _EventPeristenceQueue(object):
# callbacks on the deferred.
try:
ret = yield per_item_callback(item)
- item.deferred.callback(ret)
+ with PreserveLoggingContext():
+ item.deferred.callback(ret)
except Exception:
item.deferred.errback()
finally:
@@ -155,11 +157,8 @@ class _EventPeristenceQueue(object):
self._event_persist_queues[room_id] = queue
self._currently_persisting_rooms.discard(room_id)
- # set handle_queue_loop off on the background. We don't want to
- # attribute work done in it to the current request, so we drop the
- # logcontext altogether.
- with PreserveLoggingContext():
- handle_queue_loop()
+ # set handle_queue_loop off in the background
+ run_as_background_process("persist_events", handle_queue_loop)
def _get_drainining_queue(self, room_id):
queue = self._event_persist_queues.setdefault(room_id, deque())
@@ -345,11 +344,14 @@ class EventsStore(EventsWorkerStore):
new_forward_extremeties = {}
# map room_id->(type,state_key)->event_id tracking the full
- # state in each room after adding these events
+ # state in each room after adding these events.
+ # This is simply used to prefill the get_current_state_ids
+ # cache
current_state_for_room = {}
- # map room_id->(to_delete, to_insert) where each entry is
- # a map (type,key)->event_id giving the state delta in each
+ # map room_id->(to_delete, to_insert) where to_delete is a list
+ # of type/state keys to remove from current state, and to_insert
+ # is a map (type,key)->event_id giving the state delta in each
# room
state_delta_for_room = {}
@@ -419,19 +421,40 @@ class EventsStore(EventsWorkerStore):
logger.info(
"Calculating state delta for room %s", room_id,
)
- current_state = yield self._get_new_state_after_events(
- room_id,
- ev_ctx_rm,
- latest_event_ids,
- new_latest_event_ids,
- )
+ with Measure(
+ self._clock,
+ "persist_events.get_new_state_after_events",
+ ):
+ res = yield self._get_new_state_after_events(
+ room_id,
+ ev_ctx_rm,
+ latest_event_ids,
+ new_latest_event_ids,
+ )
+ current_state, delta_ids = res
+
+ # If either are not None then there has been a change,
+ # and we need to work out the delta (or use that
+ # given)
+ if delta_ids is not None:
+ # If there is a delta we know that we've
+ # only added or replaced state, never
+ # removed keys entirely.
+ state_delta_for_room[room_id] = ([], delta_ids)
+ elif current_state is not None:
+ with Measure(
+ self._clock,
+ "persist_events.calculate_state_delta",
+ ):
+ delta = yield self._calculate_state_delta(
+ room_id, current_state,
+ )
+ state_delta_for_room[room_id] = delta
+
+ # If we have the current_state then lets prefill
+ # the cache with it.
if current_state is not None:
current_state_for_room[room_id] = current_state
- delta = yield self._calculate_state_delta(
- room_id, current_state,
- )
- if delta is not None:
- state_delta_for_room[room_id] = delta
yield self.runInteraction(
"persist_events",
@@ -530,9 +553,15 @@ class EventsStore(EventsWorkerStore):
the new forward extremities for the room.
Returns:
- Deferred[dict[(str,str), str]|None]:
- None if there are no changes to the room state, or
- a dict of (type, state_key) -> event_id].
+ Deferred[tuple[dict[(str,str), str]|None, dict[(str,str), str]|None]]:
+ Returns a tuple of two state maps, the first being the full new current
+ state and the second being the delta to the existing current state.
+ If both are None then there has been no change.
+
+ If there has been a change then we only return the delta if its
+ already been calculated. Conversely if we do know the delta then
+ the new current state is only returned if we've already calculated
+ it.
"""
if not new_latest_event_ids:
@@ -540,6 +569,10 @@ class EventsStore(EventsWorkerStore):
# map from state_group to ((type, key) -> event_id) state map
state_groups_map = {}
+
+ # Map from (prev state group, new state group) -> delta state dict
+ state_group_deltas = {}
+
for ev, ctx in events_context:
if ctx.state_group is None:
# I don't think this can happen, but let's double-check
@@ -551,7 +584,15 @@ class EventsStore(EventsWorkerStore):
if ctx.state_group in state_groups_map:
continue
- state_groups_map[ctx.state_group] = ctx.current_state_ids
+ # We're only interested in pulling out state that has already
+ # been cached in the context. We'll pull stuff out of the DB later
+ # if necessary.
+ current_state_ids = ctx.get_cached_current_state_ids()
+ if current_state_ids is not None:
+ state_groups_map[ctx.state_group] = current_state_ids
+
+ if ctx.prev_group:
+ state_group_deltas[(ctx.prev_group, ctx.state_group)] = ctx.delta_ids
# We need to map the event_ids to their state groups. First, let's
# check if the event is one we're persisting, in which case we can
@@ -594,7 +635,26 @@ class EventsStore(EventsWorkerStore):
# If they old and new groups are the same then we don't need to do
# anything.
if old_state_groups == new_state_groups:
- return
+ defer.returnValue((None, None))
+
+ if len(new_state_groups) == 1 and len(old_state_groups) == 1:
+ # If we're going from one state group to another, lets check if
+ # we have a delta for that transition. If we do then we can just
+ # return that.
+
+ new_state_group = next(iter(new_state_groups))
+ old_state_group = next(iter(old_state_groups))
+
+ delta_ids = state_group_deltas.get(
+ (old_state_group, new_state_group,), None
+ )
+ if delta_ids is not None:
+ # We have a delta from the existing to new current state,
+ # so lets just return that. If we happen to already have
+ # the current state in memory then lets also return that,
+ # but it doesn't matter if we don't.
+ new_state = state_groups_map.get(new_state_group)
+ defer.returnValue((new_state, delta_ids))
# Now that we have calculated new_state_groups we need to get
# their state IDs so we can resolve to a single state set.
@@ -606,7 +666,7 @@ class EventsStore(EventsWorkerStore):
if len(new_state_groups) == 1:
# If there is only one state group, then we know what the current
# state is.
- defer.returnValue(state_groups_map[new_state_groups.pop()])
+ defer.returnValue((state_groups_map[new_state_groups.pop()], None))
# Ok, we need to defer to the state handler to resolve our state sets.
@@ -625,7 +685,7 @@ class EventsStore(EventsWorkerStore):
room_id, state_groups, events_map, get_events
)
- defer.returnValue(res.state)
+ defer.returnValue((res.state, None))
@defer.inlineCallbacks
def _calculate_state_delta(self, room_id, current_state):
@@ -634,28 +694,20 @@ class EventsStore(EventsWorkerStore):
Assumes that we are only persisting events for one room at a time.
Returns:
- 2-tuple (to_delete, to_insert) where both are state dicts,
- i.e. (type, state_key) -> event_id. `to_delete` are the entries to
- first be deleted from current_state_events, `to_insert` are entries
- to insert.
+ tuple[list, dict] (to_delete, to_insert): where to_delete are the
+ type/state_keys to remove from current_state_events and `to_insert`
+ are the updates to current_state_events.
"""
existing_state = yield self.get_current_state_ids(room_id)
- existing_events = set(itervalues(existing_state))
- new_events = set(ev_id for ev_id in itervalues(current_state))
- changed_events = existing_events ^ new_events
-
- if not changed_events:
- return
+ to_delete = [
+ key for key in existing_state
+ if key not in current_state
+ ]
- to_delete = {
- key: ev_id for key, ev_id in iteritems(existing_state)
- if ev_id in changed_events
- }
- events_to_insert = (new_events - existing_events)
to_insert = {
key: ev_id for key, ev_id in iteritems(current_state)
- if ev_id in events_to_insert
+ if ev_id != existing_state.get(key)
}
defer.returnValue((to_delete, to_insert))
@@ -678,10 +730,10 @@ class EventsStore(EventsWorkerStore):
delete_existing (bool): True to purge existing table rows for the
events from the database. This is useful when retrying due to
IntegrityError.
- state_delta_for_room (dict[str, (list[str], list[str])]):
+ state_delta_for_room (dict[str, (list, dict)]):
The current-state delta for each room. For each room, a tuple
- (to_delete, to_insert), being a list of event ids to be removed
- from the current state, and a list of event ids to be added to
+ (to_delete, to_insert), being a list of type/state keys to be
+ removed from the current state, and a state set to be added to
the current state.
new_forward_extremeties (dict[str, list[str]]):
The new forward extremities for each room. For each room, a
@@ -759,9 +811,46 @@ class EventsStore(EventsWorkerStore):
def _update_current_state_txn(self, txn, state_delta_by_room, max_stream_order):
for room_id, current_state_tuple in iteritems(state_delta_by_room):
to_delete, to_insert = current_state_tuple
+
+ # First we add entries to the current_state_delta_stream. We
+ # do this before updating the current_state_events table so
+ # that we can use it to calculate the `prev_event_id`. (This
+ # allows us to not have to pull out the existing state
+ # unnecessarily).
+ sql = """
+ INSERT INTO current_state_delta_stream
+ (stream_id, room_id, type, state_key, event_id, prev_event_id)
+ SELECT ?, ?, ?, ?, ?, (
+ SELECT event_id FROM current_state_events
+ WHERE room_id = ? AND type = ? AND state_key = ?
+ )
+ """
+ txn.executemany(sql, (
+ (
+ max_stream_order, room_id, etype, state_key, None,
+ room_id, etype, state_key,
+ )
+ for etype, state_key in to_delete
+ # We sanity check that we're deleting rather than updating
+ if (etype, state_key) not in to_insert
+ ))
+ txn.executemany(sql, (
+ (
+ max_stream_order, room_id, etype, state_key, ev_id,
+ room_id, etype, state_key,
+ )
+ for (etype, state_key), ev_id in iteritems(to_insert)
+ ))
+
+ # Now we actually update the current_state_events table
+
txn.executemany(
- "DELETE FROM current_state_events WHERE event_id = ?",
- [(ev_id,) for ev_id in itervalues(to_delete)],
+ "DELETE FROM current_state_events"
+ " WHERE room_id = ? AND type = ? AND state_key = ?",
+ (
+ (room_id, etype, state_key)
+ for etype, state_key in itertools.chain(to_delete, to_insert)
+ ),
)
self._simple_insert_many_txn(
@@ -778,25 +867,6 @@ class EventsStore(EventsWorkerStore):
],
)
- state_deltas = {key: None for key in to_delete}
- state_deltas.update(to_insert)
-
- self._simple_insert_many_txn(
- txn,
- table="current_state_delta_stream",
- values=[
- {
- "stream_id": max_stream_order,
- "room_id": room_id,
- "type": key[0],
- "state_key": key[1],
- "event_id": ev_id,
- "prev_event_id": to_delete.get(key, None),
- }
- for key, ev_id in iteritems(state_deltas)
- ]
- )
-
txn.call_after(
self._curr_state_delta_stream_cache.entity_has_changed,
room_id, max_stream_order,
@@ -810,7 +880,8 @@ class EventsStore(EventsWorkerStore):
# and which we have added, then we invlidate the caches for all
# those users.
members_changed = set(
- state_key for ev_type, state_key in state_deltas
+ state_key
+ for ev_type, state_key in itertools.chain(to_delete, to_insert)
if ev_type == EventTypes.Member
)
diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py
index 6b8a8e908f..9b4cfeb899 100644
--- a/synapse/storage/events_worker.py
+++ b/synapse/storage/events_worker.py
@@ -25,6 +25,7 @@ from synapse.events import EventBase # noqa: F401
from synapse.events import FrozenEvent
from synapse.events.snapshot import EventContext # noqa: F401
from synapse.events.utils import prune_event
+from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.logcontext import (
LoggingContext,
PreserveLoggingContext,
@@ -330,10 +331,11 @@ class EventsWorkerStore(SQLBaseStore):
should_start = False
if should_start:
- with PreserveLoggingContext():
- self.runWithConnection(
- self._do_fetch
- )
+ run_as_background_process(
+ "fetch_events",
+ self.runWithConnection,
+ self._do_fetch,
+ )
logger.debug("Loading %d events", len(events))
with PreserveLoggingContext():
diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py
index be655d287b..6a5028961d 100644
--- a/synapse/storage/push_rule.py
+++ b/synapse/storage/push_rule.py
@@ -21,7 +21,6 @@ from canonicaljson import json
from twisted.internet import defer
-from synapse.api.constants import EventTypes
from synapse.push.baserules import list_with_base_rules
from synapse.storage.appservice import ApplicationServiceWorkerStore
from synapse.storage.pusher import PusherWorkerStore
@@ -186,6 +185,7 @@ class PushRulesWorkerStore(ApplicationServiceWorkerStore,
defer.returnValue(results)
+ @defer.inlineCallbacks
def bulk_get_push_rules_for_room(self, event, context):
state_group = context.state_group
if not state_group:
@@ -195,9 +195,11 @@ class PushRulesWorkerStore(ApplicationServiceWorkerStore,
# To do this we set the state_group to a new object as object() != object()
state_group = object()
- return self._bulk_get_push_rules_for_room(
- event.room_id, state_group, context.current_state_ids, event=event
+ current_state_ids = yield context.get_current_state_ids(self)
+ result = yield self._bulk_get_push_rules_for_room(
+ event.room_id, state_group, current_state_ids, event=event
)
+ defer.returnValue(result)
@cachedInlineCallbacks(num_args=2, cache_context=True)
def _bulk_get_push_rules_for_room(self, room_id, state_group, current_state_ids,
@@ -247,18 +249,6 @@ class PushRulesWorkerStore(ApplicationServiceWorkerStore,
if uid in local_users_in_room:
user_ids.add(uid)
- forgotten = yield self.who_forgot_in_room(
- event.room_id, on_invalidate=cache_context.invalidate,
- )
-
- for row in forgotten:
- user_id = row["user_id"]
- event_id = row["event_id"]
-
- mem_id = current_state_ids.get((EventTypes.Member, user_id), None)
- if event_id == mem_id:
- user_ids.discard(user_id)
-
rules_by_user = yield self.bulk_get_push_rules(
user_ids, on_invalidate=cache_context.invalidate,
)
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index cc273a57b2..8443bd4c1b 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -233,7 +233,7 @@ class PusherStore(PusherWorkerStore):
)
if newly_inserted:
- self.runInteraction(
+ yield self.runInteraction(
"add_pusher",
self._invalidate_cache_and_stream,
self.get_if_user_has_pusher, (user_id,)
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 02a802bed9..009b91dd52 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -66,7 +66,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
hosts = frozenset(get_domain_from_id(user_id) for user_id in user_ids)
defer.returnValue(hosts)
- @cached(max_entries=100000, iterable=True)
+ @cachedInlineCallbacks(max_entries=100000, iterable=True)
def get_users_in_room(self, room_id):
def f(txn):
sql = (
@@ -80,7 +80,14 @@ class RoomMemberWorkerStore(EventsWorkerStore):
txn.execute(sql, (room_id, Membership.JOIN,))
return [to_ascii(r[0]) for r in txn]
- return self.runInteraction("get_users_in_room", f)
+ start_time = self._clock.time_msec()
+ result = yield self.runInteraction("get_users_in_room", f)
+ end_time = self._clock.time_msec()
+ logger.info(
+ "Fetched room membership for %s (%i users) in %i ms",
+ room_id, len(result), end_time - start_time,
+ )
+ defer.returnValue(result)
@cached()
def get_invited_rooms_for_user(self, user_id):
@@ -232,6 +239,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
defer.returnValue(user_who_share_room)
+ @defer.inlineCallbacks
def get_joined_users_from_context(self, event, context):
state_group = context.state_group
if not state_group:
@@ -241,11 +249,13 @@ class RoomMemberWorkerStore(EventsWorkerStore):
# To do this we set the state_group to a new object as object() != object()
state_group = object()
- return self._get_joined_users_from_context(
- event.room_id, state_group, context.current_state_ids,
+ current_state_ids = yield context.get_current_state_ids(self)
+ result = yield self._get_joined_users_from_context(
+ event.room_id, state_group, current_state_ids,
event=event,
context=context,
)
+ defer.returnValue(result)
def get_joined_users_from_state(self, room_id, state_entry):
state_group = state_entry.state_group
@@ -458,17 +468,29 @@ class RoomMemberWorkerStore(EventsWorkerStore):
def _get_joined_hosts_cache(self, room_id):
return _JoinedHostsCache(self, room_id)
- @cached()
- def who_forgot_in_room(self, room_id):
- return self._simple_select_list(
- table="room_memberships",
- retcols=("user_id", "event_id"),
- keyvalues={
- "room_id": room_id,
- "forgotten": 1,
- },
- desc="who_forgot"
- )
+ @cachedInlineCallbacks(num_args=2)
+ def did_forget(self, user_id, room_id):
+ """Returns whether user_id has elected to discard history for room_id.
+
+ Returns False if they have since re-joined."""
+ def f(txn):
+ sql = (
+ "SELECT"
+ " COUNT(*)"
+ " FROM"
+ " room_memberships"
+ " WHERE"
+ " user_id = ?"
+ " AND"
+ " room_id = ?"
+ " AND"
+ " forgotten = 0"
+ )
+ txn.execute(sql, (user_id, room_id))
+ rows = txn.fetchall()
+ return rows[0][0]
+ count = yield self.runInteraction("did_forget_membership", f)
+ defer.returnValue(count == 0)
class RoomMemberStore(RoomMemberWorkerStore):
@@ -577,36 +599,11 @@ class RoomMemberStore(RoomMemberWorkerStore):
)
txn.execute(sql, (user_id, room_id))
- txn.call_after(self.did_forget.invalidate, (user_id, room_id))
self._invalidate_cache_and_stream(
- txn, self.who_forgot_in_room, (room_id,)
+ txn, self.did_forget, (user_id, room_id,),
)
return self.runInteraction("forget_membership", f)
- @cachedInlineCallbacks(num_args=2)
- def did_forget(self, user_id, room_id):
- """Returns whether user_id has elected to discard history for room_id.
-
- Returns False if they have since re-joined."""
- def f(txn):
- sql = (
- "SELECT"
- " COUNT(*)"
- " FROM"
- " room_memberships"
- " WHERE"
- " user_id = ?"
- " AND"
- " room_id = ?"
- " AND"
- " forgotten = 0"
- )
- txn.execute(sql, (user_id, room_id))
- rows = txn.fetchall()
- return rows[0][0]
- count = yield self.runInteraction("did_forget_membership", f)
- defer.returnValue(count == 0)
-
@defer.inlineCallbacks
def _background_add_membership_profile(self, progress, batch_size):
target_min_stream_id = progress.get(
diff --git a/synapse/storage/search.py b/synapse/storage/search.py
index d5b5df93e6..a35291a3f6 100644
--- a/synapse/storage/search.py
+++ b/synapse/storage/search.py
@@ -724,7 +724,7 @@ def _parse_query(database_engine, search_term):
results = re.findall(r"([\w\-]+)", search_term, re.UNICODE)
if isinstance(database_engine, PostgresEngine):
- return " & ".join(result + ":*" for result in results)
+ return " & ".join(result for result in results)
elif isinstance(database_engine, Sqlite3Engine):
return " & ".join(result + "*" for result in results)
else:
diff --git a/synapse/util/async.py b/synapse/util/async.py
index 5d0fb39130..a7094e2fb4 100644
--- a/synapse/util/async.py
+++ b/synapse/util/async.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -12,7 +13,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
+import collections
import logging
from contextlib import contextmanager
@@ -156,54 +157,72 @@ def concurrently_execute(func, args, limit):
class Linearizer(object):
- """Linearizes access to resources based on a key. Useful to ensure only one
- thing is happening at a time on a given resource.
+ """Limits concurrent access to resources based on a key. Useful to ensure
+ only a few things happen at a time on a given resource.
Example:
- with (yield linearizer.queue("test_key")):
+ with (yield limiter.queue("test_key")):
# do some work.
"""
- def __init__(self, name=None, clock=None):
+ def __init__(self, name=None, max_count=1, clock=None):
+ """
+ Args:
+ max_count(int): The maximum number of concurrent accesses
+ """
if name is None:
self.name = id(self)
else:
self.name = name
- self.key_to_defer = {}
if not clock:
from twisted.internet import reactor
clock = Clock(reactor)
self._clock = clock
+ self.max_count = max_count
+
+ # key_to_defer is a map from the key to a 2 element list where
+ # the first element is the number of things executing, and
+ # the second element is an OrderedDict, where the keys are deferreds for the
+ # things blocked from executing.
+ self.key_to_defer = {}
@defer.inlineCallbacks
def queue(self, key):
- # If there is already a deferred in the queue, we pull it out so that
- # we can wait on it later.
- # Then we replace it with a deferred that we resolve *after* the
- # context manager has exited.
- # We only return the context manager after the previous deferred has
- # resolved.
- # This all has the net effect of creating a chain of deferreds that
- # wait for the previous deferred before starting their work.
- current_defer = self.key_to_defer.get(key)
+ entry = self.key_to_defer.setdefault(key, [0, collections.OrderedDict()])
- new_defer = defer.Deferred()
- self.key_to_defer[key] = new_defer
+ # If the number of things executing is greater than the maximum
+ # then add a deferred to the list of blocked items
+ # When on of the things currently executing finishes it will callback
+ # this item so that it can continue executing.
+ if entry[0] >= self.max_count:
+ new_defer = defer.Deferred()
+ entry[1][new_defer] = 1
- if current_defer:
logger.info(
- "Waiting to acquire linearizer lock %r for key %r", self.name, key
+ "Waiting to acquire linearizer lock %r for key %r", self.name, key,
)
try:
- with PreserveLoggingContext():
- yield current_defer
- except Exception:
- logger.exception("Unexpected exception in Linearizer")
-
- logger.info("Acquired linearizer lock %r for key %r", self.name,
- key)
+ yield make_deferred_yieldable(new_defer)
+ except Exception as e:
+ if isinstance(e, CancelledError):
+ logger.info(
+ "Cancelling wait for linearizer lock %r for key %r",
+ self.name, key,
+ )
+ else:
+ logger.warn(
+ "Unexpected exception waiting for linearizer lock %r for key %r",
+ self.name, key,
+ )
+
+ # we just have to take ourselves back out of the queue.
+ del entry[1][new_defer]
+ raise
+
+ logger.info("Acquired linearizer lock %r for key %r", self.name, key)
+ entry[0] += 1
# if the code holding the lock completes synchronously, then it
# will recursively run the next claimant on the list. That can
@@ -213,15 +232,15 @@ class Linearizer(object):
# In order to break the cycle, we add a cheeky sleep(0) here to
# ensure that we fall back to the reactor between each iteration.
#
- # (There's no particular need for it to happen before we return
- # the context manager, but it needs to happen while we hold the
- # lock, and the context manager's exit code must be synchronous,
- # so actually this is the only sensible place.
+ # (This needs to happen while we hold the lock, and the context manager's exit
+ # code must be synchronous, so this is the only sensible place.)
yield self._clock.sleep(0)
else:
- logger.info("Acquired uncontended linearizer lock %r for key %r",
- self.name, key)
+ logger.info(
+ "Acquired uncontended linearizer lock %r for key %r", self.name, key,
+ )
+ entry[0] += 1
@contextmanager
def _ctx_manager():
@@ -229,73 +248,15 @@ class Linearizer(object):
yield
finally:
logger.info("Releasing linearizer lock %r for key %r", self.name, key)
- with PreserveLoggingContext():
- new_defer.callback(None)
- current_d = self.key_to_defer.get(key)
- if current_d is new_defer:
- self.key_to_defer.pop(key, None)
-
- defer.returnValue(_ctx_manager())
-
-
-class Limiter(object):
- """Limits concurrent access to resources based on a key. Useful to ensure
- only a few thing happen at a time on a given resource.
-
- Example:
-
- with (yield limiter.queue("test_key")):
- # do some work.
-
- """
- def __init__(self, max_count):
- """
- Args:
- max_count(int): The maximum number of concurrent access
- """
- self.max_count = max_count
-
- # key_to_defer is a map from the key to a 2 element list where
- # the first element is the number of things executing
- # the second element is a list of deferreds for the things blocked from
- # executing.
- self.key_to_defer = {}
-
- @defer.inlineCallbacks
- def queue(self, key):
- entry = self.key_to_defer.setdefault(key, [0, []])
-
- # If the number of things executing is greater than the maximum
- # then add a deferred to the list of blocked items
- # When on of the things currently executing finishes it will callback
- # this item so that it can continue executing.
- if entry[0] >= self.max_count:
- new_defer = defer.Deferred()
- entry[1].append(new_defer)
-
- logger.info("Waiting to acquire limiter lock for key %r", key)
- with PreserveLoggingContext():
- yield new_defer
- logger.info("Acquired limiter lock for key %r", key)
- else:
- logger.info("Acquired uncontended limiter lock for key %r", key)
-
- entry[0] += 1
-
- @contextmanager
- def _ctx_manager():
- try:
- yield
- finally:
- logger.info("Releasing limiter lock for key %r", key)
# We've finished executing so check if there are any things
# blocked waiting to execute and start one of them
entry[0] -= 1
if entry[1]:
- next_def = entry[1].pop(0)
+ (next_def, _) = entry[1].popitem(last=False)
+ # we need to run the next thing in the sentinel context.
with PreserveLoggingContext():
next_def.callback(None)
elif entry[0] == 0:
diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py
index 4abca91f6d..465adc54a8 100644
--- a/synapse/util/caches/expiringcache.py
+++ b/synapse/util/caches/expiringcache.py
@@ -16,6 +16,7 @@
import logging
from collections import OrderedDict
+from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.caches import register_cache
logger = logging.getLogger(__name__)
@@ -63,7 +64,10 @@ class ExpiringCache(object):
return
def f():
- self._prune_cache()
+ run_as_background_process(
+ "prune_cache_%s" % self._cache_name,
+ self._prune_cache,
+ )
self._clock.looping_call(f, self._expiry_ms / 2)
diff --git a/synapse/util/distributor.py b/synapse/util/distributor.py
index 734331caaa..194da87639 100644
--- a/synapse/util/distributor.py
+++ b/synapse/util/distributor.py
@@ -17,20 +17,18 @@ import logging
from twisted.internet import defer
-from synapse.util import unwrapFirstError
-from synapse.util.logcontext import PreserveLoggingContext
+from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.util.logcontext import make_deferred_yieldable, run_in_background
logger = logging.getLogger(__name__)
def user_left_room(distributor, user, room_id):
- with PreserveLoggingContext():
- distributor.fire("user_left_room", user=user, room_id=room_id)
+ distributor.fire("user_left_room", user=user, room_id=room_id)
def user_joined_room(distributor, user, room_id):
- with PreserveLoggingContext():
- distributor.fire("user_joined_room", user=user, room_id=room_id)
+ distributor.fire("user_joined_room", user=user, room_id=room_id)
class Distributor(object):
@@ -44,9 +42,7 @@ class Distributor(object):
model will do for today.
"""
- def __init__(self, suppress_failures=True):
- self.suppress_failures = suppress_failures
-
+ def __init__(self):
self.signals = {}
self.pre_registration = {}
@@ -56,7 +52,6 @@ class Distributor(object):
self.signals[name] = Signal(
name,
- suppress_failures=self.suppress_failures,
)
if name in self.pre_registration:
@@ -75,10 +70,18 @@ class Distributor(object):
self.pre_registration[name].append(observer)
def fire(self, name, *args, **kwargs):
+ """Dispatches the given signal to the registered observers.
+
+ Runs the observers as a background process. Does not return a deferred.
+ """
if name not in self.signals:
raise KeyError("%r does not have a signal named %s" % (self, name))
- return self.signals[name].fire(*args, **kwargs)
+ run_as_background_process(
+ name,
+ self.signals[name].fire,
+ *args, **kwargs
+ )
class Signal(object):
@@ -91,9 +94,8 @@ class Signal(object):
method into all of the observers.
"""
- def __init__(self, name, suppress_failures):
+ def __init__(self, name):
self.name = name
- self.suppress_failures = suppress_failures
self.observers = []
def observe(self, observer):
@@ -103,7 +105,6 @@ class Signal(object):
Each observer callable may return a Deferred."""
self.observers.append(observer)
- @defer.inlineCallbacks
def fire(self, *args, **kwargs):
"""Invokes every callable in the observer list, passing in the args and
kwargs. Exceptions thrown by observers are logged but ignored. It is
@@ -121,22 +122,17 @@ class Signal(object):
failure.type,
failure.value,
failure.getTracebackObject()))
- if not self.suppress_failures:
- return failure
return defer.maybeDeferred(observer, *args, **kwargs).addErrback(eb)
- with PreserveLoggingContext():
- deferreds = [
- do(observer)
- for observer in self.observers
- ]
-
- res = yield defer.gatherResults(
- deferreds, consumeErrors=True
- ).addErrback(unwrapFirstError)
+ deferreds = [
+ run_in_background(do, o)
+ for o in self.observers
+ ]
- defer.returnValue(res)
+ return make_deferred_yieldable(defer.gatherResults(
+ deferreds, consumeErrors=True,
+ ))
def __repr__(self):
return "<Signal name=%r>" % (self.name,)
diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py
index f6c7175f74..8dcae50b39 100644
--- a/synapse/util/logcontext.py
+++ b/synapse/util/logcontext.py
@@ -99,6 +99,17 @@ class ContextResourceUsage(object):
self.db_sched_duration_sec = 0
self.evt_db_fetch_count = 0
+ def __repr__(self):
+ return ("<ContextResourceUsage ru_stime='%r', ru_utime='%r', "
+ "db_txn_count='%r', db_txn_duration_sec='%r', "
+ "db_sched_duration_sec='%r', evt_db_fetch_count='%r'>") % (
+ self.ru_stime,
+ self.ru_utime,
+ self.db_txn_count,
+ self.db_txn_duration_sec,
+ self.db_sched_duration_sec,
+ self.evt_db_fetch_count,)
+
def __iadd__(self, other):
"""Add another ContextResourceUsage's stats to this one's.
diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py
index 6ba7107896..97f1267380 100644
--- a/synapse/util/metrics.py
+++ b/synapse/util/metrics.py
@@ -104,12 +104,19 @@ class Measure(object):
logger.warn("Expected context. (%r)", self.name)
return
- usage = context.get_resource_usage() - self.start_usage
- block_ru_utime.labels(self.name).inc(usage.ru_utime)
- block_ru_stime.labels(self.name).inc(usage.ru_stime)
- block_db_txn_count.labels(self.name).inc(usage.db_txn_count)
- block_db_txn_duration.labels(self.name).inc(usage.db_txn_duration_sec)
- block_db_sched_duration.labels(self.name).inc(usage.db_sched_duration_sec)
+ current = context.get_resource_usage()
+ usage = current - self.start_usage
+ try:
+ block_ru_utime.labels(self.name).inc(usage.ru_utime)
+ block_ru_stime.labels(self.name).inc(usage.ru_stime)
+ block_db_txn_count.labels(self.name).inc(usage.db_txn_count)
+ block_db_txn_duration.labels(self.name).inc(usage.db_txn_duration_sec)
+ block_db_sched_duration.labels(self.name).inc(usage.db_sched_duration_sec)
+ except ValueError:
+ logger.warn(
+ "Failed to save metrics! OLD: %r, NEW: %r",
+ self.start_usage, current
+ )
if self.created_context:
self.start_context.__exit__(exc_type, exc_val, exc_tb)
diff --git a/synapse/visibility.py b/synapse/visibility.py
index 9b97ea2b83..d4680863d3 100644
--- a/synapse/visibility.py
+++ b/synapse/visibility.py
@@ -12,18 +12,18 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-import itertools
+
import logging
import operator
-import six
+from six import iteritems, itervalues
+from six.moves import map
from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
from synapse.events.utils import prune_event
from synapse.types import get_domain_from_id
-from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
logger = logging.getLogger(__name__)
@@ -75,19 +75,6 @@ def filter_events_for_client(store, user_id, events, is_peeking=False,
types=types,
)
- forgotten = yield make_deferred_yieldable(defer.gatherResults([
- defer.maybeDeferred(
- preserve_fn(store.who_forgot_in_room),
- room_id,
- )
- for room_id in frozenset(e.room_id for e in events)
- ], consumeErrors=True))
-
- # Set of membership event_ids that have been forgotten
- event_id_forgotten = frozenset(
- row["event_id"] for rows in forgotten for row in rows
- )
-
ignore_dict_content = yield store.get_global_account_data_by_type_for_user(
"m.ignored_user_list", user_id,
)
@@ -176,10 +163,7 @@ def filter_events_for_client(store, user_id, events, is_peeking=False,
if membership is None:
membership_event = state.get((EventTypes.Member, user_id), None)
if membership_event:
- # XXX why do we do this?
- # https://github.com/matrix-org/synapse/issues/3350
- if membership_event.event_id not in event_id_forgotten:
- membership = membership_event.membership
+ membership = membership_event.membership
# if the user was a member of the room at the time of the event,
# they can see it.
@@ -221,7 +205,7 @@ def filter_events_for_client(store, user_id, events, is_peeking=False,
return event
# check each event: gives an iterable[None|EventBase]
- filtered_events = itertools.imap(allowed, events)
+ filtered_events = map(allowed, events)
# remove the None entries
filtered_events = filter(operator.truth, filtered_events)
@@ -261,7 +245,7 @@ def filter_events_for_server(store, server_name, events):
# membership states for the requesting server to determine
# if the server is either in the room or has been invited
# into the room.
- for ev in state.itervalues():
+ for ev in itervalues(state):
if ev.type != EventTypes.Member:
continue
try:
@@ -295,7 +279,7 @@ def filter_events_for_server(store, server_name, events):
)
visibility_ids = set()
- for sids in event_to_state_ids.itervalues():
+ for sids in itervalues(event_to_state_ids):
hist = sids.get((EventTypes.RoomHistoryVisibility, ""))
if hist:
visibility_ids.add(hist)
@@ -308,7 +292,7 @@ def filter_events_for_server(store, server_name, events):
event_map = yield store.get_events(visibility_ids)
all_open = all(
e.content.get("history_visibility") in (None, "shared", "world_readable")
- for e in event_map.itervalues()
+ for e in itervalues(event_map)
)
if all_open:
@@ -346,7 +330,7 @@ def filter_events_for_server(store, server_name, events):
#
state_key_to_event_id_set = {
e
- for key_to_eid in six.itervalues(event_to_state_ids)
+ for key_to_eid in itervalues(event_to_state_ids)
for e in key_to_eid.items()
}
@@ -369,10 +353,10 @@ def filter_events_for_server(store, server_name, events):
event_to_state = {
e_id: {
key: event_map[inner_e_id]
- for key, inner_e_id in key_to_eid.iteritems()
+ for key, inner_e_id in iteritems(key_to_eid)
if inner_e_id in event_map
}
- for e_id, key_to_eid in event_to_state_ids.iteritems()
+ for e_id, key_to_eid in iteritems(event_to_state_ids)
}
defer.returnValue([
|