diff --git a/synapse/__init__.py b/synapse/__init__.py
index 3cde33c0d7..5c0f2f83aa 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -17,4 +17,4 @@
""" This is a reference implementation of a Matrix home server.
"""
-__version__ = "0.32.2"
+__version__ = "0.33.0"
diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index bc629832d9..073229b4c4 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -65,8 +65,9 @@ class Auth(object):
@defer.inlineCallbacks
def check_from_context(self, event, context, do_sig_check=True):
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
auth_events_ids = yield self.compute_auth_events(
- event, context.prev_state_ids, for_verification=True,
+ event, prev_state_ids, for_verification=True,
)
auth_events = yield self.store.get_events(auth_events_ids)
auth_events = {
@@ -544,7 +545,8 @@ class Auth(object):
@defer.inlineCallbacks
def add_auth_events(self, builder, context):
- auth_ids = yield self.compute_auth_events(builder, context.prev_state_ids)
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
+ auth_ids = yield self.compute_auth_events(builder, prev_state_ids)
auth_events_entries = yield self.store.add_event_hashes(
auth_ids
@@ -737,3 +739,37 @@ class Auth(object):
)
return query_params[0]
+
+ @defer.inlineCallbacks
+ def check_in_room_or_world_readable(self, room_id, user_id):
+ """Checks that the user is or was in the room or the room is world
+ readable. If it isn't then an exception is raised.
+
+ Returns:
+ Deferred[tuple[str, str|None]]: Resolves to the current membership of
+ the user in the room and the membership event ID of the user. If
+ the user is not in the room and never has been, then
+ `(Membership.JOIN, None)` is returned.
+ """
+
+ try:
+ # check_user_was_in_room will return the most recent membership
+ # event for the user if:
+ # * The user is a non-guest user, and was ever in the room
+ # * The user is a guest user, and has joined the room
+ # else it will throw.
+ member_event = yield self.check_user_was_in_room(room_id, user_id)
+ defer.returnValue((member_event.membership, member_event.event_id))
+ except AuthError:
+ visibility = yield self.state.get_current_state(
+ room_id, EventTypes.RoomHistoryVisibility, ""
+ )
+ if (
+ visibility and
+ visibility.content["history_visibility"] == "world_readable"
+ ):
+ defer.returnValue((Membership.JOIN, None))
+ return
+ raise AuthError(
+ 403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN
+ )
diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py
index 25346baa87..186831e118 100644
--- a/synapse/api/filtering.py
+++ b/synapse/api/filtering.py
@@ -113,7 +113,13 @@ ROOM_EVENT_FILTER_SCHEMA = {
},
"contains_url": {
"type": "boolean"
- }
+ },
+ "lazy_load_members": {
+ "type": "boolean"
+ },
+ "include_redundant_members": {
+ "type": "boolean"
+ },
}
}
@@ -261,6 +267,12 @@ class FilterCollection(object):
def ephemeral_limit(self):
return self._room_ephemeral_filter.limit()
+ def lazy_load_members(self):
+ return self._room_state_filter.lazy_load_members()
+
+ def include_redundant_members(self):
+ return self._room_state_filter.include_redundant_members()
+
def filter_presence(self, events):
return self._presence_filter.filter(events)
@@ -417,6 +429,12 @@ class Filter(object):
def limit(self):
return self.filter_json.get("limit", 10)
+ def lazy_load_members(self):
+ return self.filter_json.get("lazy_load_members", False)
+
+ def include_redundant_members(self):
+ return self.filter_json.get("include_redundant_members", False)
+
def _matches_wildcard(actual_value, filter_value):
if filter_value.endswith("*"):
diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py
index b281e7648e..6b77aec832 100644
--- a/synapse/app/client_reader.py
+++ b/synapse/app/client_reader.py
@@ -31,6 +31,7 @@ from synapse.http.site import SynapseSite
from synapse.metrics import RegistryProxy
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.replication.slave.storage._base import BaseSlavedStore
+from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
from synapse.replication.slave.storage.directory import DirectoryStore
@@ -40,7 +41,13 @@ from synapse.replication.slave.storage.registration import SlavedRegistrationSto
from synapse.replication.slave.storage.room import RoomStore
from synapse.replication.slave.storage.transactions import TransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
-from synapse.rest.client.v1.room import PublicRoomListRestServlet
+from synapse.rest.client.v1.room import (
+ JoinedRoomMemberListRestServlet,
+ PublicRoomListRestServlet,
+ RoomEventContextServlet,
+ RoomMemberListRestServlet,
+ RoomStateRestServlet,
+)
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
from synapse.util.httpresourcetree import create_resource_tree
@@ -52,6 +59,7 @@ logger = logging.getLogger("synapse.app.client_reader")
class ClientReaderSlavedStore(
+ SlavedAccountDataStore,
SlavedEventStore,
SlavedKeyStore,
RoomStore,
@@ -82,7 +90,13 @@ class ClientReaderServer(HomeServer):
resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
elif name == "client":
resource = JsonResource(self, canonical_json=False)
+
PublicRoomListRestServlet(self).register(resource)
+ RoomMemberListRestServlet(self).register(resource)
+ JoinedRoomMemberListRestServlet(self).register(resource)
+ RoomStateRestServlet(self).register(resource)
+ RoomEventContextServlet(self).register(resource)
+
resources.update({
"/_matrix/client/r0": resource,
"/_matrix/client/unstable": resource,
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index cf5cc0cf9c..e7e99fa332 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -18,6 +18,8 @@ import logging
import os
import sys
+from six import iteritems
+
from twisted.application import service
from twisted.internet import defer, reactor
from twisted.web.resource import EncodingResourceWrapper, NoResource
@@ -47,6 +49,7 @@ from synapse.http.additional_resource import AdditionalResource
from synapse.http.server import RootRedirect
from synapse.http.site import SynapseSite
from synapse.metrics import RegistryProxy
+from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
from synapse.module_api import ModuleApi
from synapse.python_dependencies import CONDITIONAL_REQUIREMENTS, check_requirements
@@ -427,6 +430,9 @@ def run(hs):
# currently either 0 or 1
stats_process = []
+ def start_phone_stats_home():
+ return run_as_background_process("phone_stats_home", phone_stats_home)
+
@defer.inlineCallbacks
def phone_stats_home():
logger.info("Gathering stats for reporting")
@@ -444,7 +450,7 @@ def run(hs):
stats["total_nonbridged_users"] = total_nonbridged_users
daily_user_type_results = yield hs.get_datastore().count_daily_user_type()
- for name, count in daily_user_type_results.iteritems():
+ for name, count in iteritems(daily_user_type_results):
stats["daily_user_type_" + name] = count
room_count = yield hs.get_datastore().get_room_count()
@@ -455,7 +461,7 @@ def run(hs):
stats["daily_messages"] = yield hs.get_datastore().count_daily_messages()
r30_results = yield hs.get_datastore().count_r30_users()
- for name, count in r30_results.iteritems():
+ for name, count in iteritems(r30_results):
stats["r30_users_" + name] = count
daily_sent_messages = yield hs.get_datastore().count_daily_sent_messages()
@@ -498,7 +504,10 @@ def run(hs):
)
def generate_user_daily_visit_stats():
- hs.get_datastore().generate_user_daily_visits()
+ return run_as_background_process(
+ "generate_user_daily_visits",
+ hs.get_datastore().generate_user_daily_visits,
+ )
# Rather than update on per session basis, batch up the requests.
# If you increase the loop period, the accuracy of user_daily_visits
@@ -507,7 +516,7 @@ def run(hs):
if hs.config.report_stats:
logger.info("Scheduling stats reporting for 3 hour intervals")
- clock.looping_call(phone_stats_home, 3 * 60 * 60 * 1000)
+ clock.looping_call(start_phone_stats_home, 3 * 60 * 60 * 1000)
# We need to defer this init for the cases that we daemonize
# otherwise the process ID we get is that of the non-daemon process
@@ -515,7 +524,7 @@ def run(hs):
# We wait 5 minutes to send the first set of stats as the server can
# be quite busy the first few minutes
- clock.call_later(5 * 60, phone_stats_home)
+ clock.call_later(5 * 60, start_phone_stats_home)
if hs.config.daemonize and hs.config.print_pidfile:
print (hs.config.pid_file)
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 26b9ec85f2..e201f18efd 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -55,7 +55,6 @@ from synapse.rest.client.v2_alpha import sync
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
from synapse.storage.presence import UserPresenceState
-from synapse.storage.roommember import RoomMemberStore
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext, run_in_background
from synapse.util.manhole import manhole
@@ -81,9 +80,7 @@ class SynchrotronSlavedStore(
RoomStore,
BaseSlavedStore,
):
- did_forget = (
- RoomMemberStore.__dict__["did_forget"]
- )
+ pass
UPDATE_SYNCING_USERS_MS = 10 * 1000
diff --git a/synapse/app/synctl.py b/synapse/app/synctl.py
index 68acc15a9a..d658f967ba 100755
--- a/synapse/app/synctl.py
+++ b/synapse/app/synctl.py
@@ -25,6 +25,8 @@ import subprocess
import sys
import time
+from six import iteritems
+
import yaml
SYNAPSE = [sys.executable, "-B", "-m", "synapse.app.homeserver"]
@@ -173,7 +175,7 @@ def main():
os.environ["SYNAPSE_CACHE_FACTOR"] = str(cache_factor)
cache_factors = config.get("synctl_cache_factors", {})
- for cache_name, factor in cache_factors.iteritems():
+ for cache_name, factor in iteritems(cache_factors):
os.environ["SYNAPSE_CACHE_FACTOR_" + cache_name.upper()] = str(factor)
worker_configfiles = []
diff --git a/synapse/config/voip.py b/synapse/config/voip.py
index 3a4e16fa96..d07bd24ffd 100644
--- a/synapse/config/voip.py
+++ b/synapse/config/voip.py
@@ -30,10 +30,10 @@ class VoipConfig(Config):
## Turn ##
# The public URIs of the TURN server to give to clients
- turn_uris: []
+ #turn_uris: []
# The shared secret used to compute passwords for the TURN server
- turn_shared_secret: "YOUR_SHARED_SECRET"
+ #turn_shared_secret: "YOUR_SHARED_SECRET"
# The Username and password if the TURN server needs them and
# does not use a token
diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py
index bcd9bb5946..368b5f6ae4 100644
--- a/synapse/events/snapshot.py
+++ b/synapse/events/snapshot.py
@@ -13,22 +13,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from six import iteritems
+
from frozendict import frozendict
from twisted.internet import defer
+from synapse.util.logcontext import make_deferred_yieldable, run_in_background
+
class EventContext(object):
"""
Attributes:
- current_state_ids (dict[(str, str), str]):
- The current state map including the current event.
- (type, state_key) -> event_id
-
- prev_state_ids (dict[(str, str), str]):
- The current state map excluding the current event.
- (type, state_key) -> event_id
-
state_group (int|None): state group id, if the state has been stored
as a state group. This is usually only None if e.g. the event is
an outlier.
@@ -45,38 +41,77 @@ class EventContext(object):
prev_state_events (?): XXX: is this ever set to anything other than
the empty list?
+
+ _current_state_ids (dict[(str, str), str]|None):
+ The current state map including the current event. None if outlier
+ or we haven't fetched the state from DB yet.
+ (type, state_key) -> event_id
+
+ _prev_state_ids (dict[(str, str), str]|None):
+ The current state map excluding the current event. None if outlier
+ or we haven't fetched the state from DB yet.
+ (type, state_key) -> event_id
+
+ _fetching_state_deferred (Deferred|None): Resolves when *_state_ids have
+ been calculated. None if we haven't started calculating yet
+
+ _event_type (str): The type of the event the context is associated with.
+ Only set when state has not been fetched yet.
+
+ _event_state_key (str|None): The state_key of the event the context is
+ associated with. Only set when state has not been fetched yet.
+
+ _prev_state_id (str|None): If the event associated with the context is
+ a state event, then `_prev_state_id` is the event_id of the state
+ that was replaced.
+ Only set when state has not been fetched yet.
"""
__slots__ = [
- "current_state_ids",
- "prev_state_ids",
"state_group",
"rejected",
"prev_group",
"delta_ids",
"prev_state_events",
"app_service",
+ "_current_state_ids",
+ "_prev_state_ids",
+ "_prev_state_id",
+ "_event_type",
+ "_event_state_key",
+ "_fetching_state_deferred",
]
def __init__(self):
+ self.prev_state_events = []
+ self.rejected = False
+ self.app_service = None
+
+ @staticmethod
+ def with_state(state_group, current_state_ids, prev_state_ids,
+ prev_group=None, delta_ids=None):
+ context = EventContext()
+
# The current state including the current event
- self.current_state_ids = None
+ context._current_state_ids = current_state_ids
# The current state excluding the current event
- self.prev_state_ids = None
- self.state_group = None
+ context._prev_state_ids = prev_state_ids
+ context.state_group = state_group
- self.rejected = False
+ context._prev_state_id = None
+ context._event_type = None
+ context._event_state_key = None
+ context._fetching_state_deferred = defer.succeed(None)
# A previously persisted state group and a delta between that
# and this state.
- self.prev_group = None
- self.delta_ids = None
+ context.prev_group = prev_group
+ context.delta_ids = delta_ids
- self.prev_state_events = None
-
- self.app_service = None
+ return context
- def serialize(self, event):
+ @defer.inlineCallbacks
+ def serialize(self, event, store):
"""Converts self to a type that can be serialized as JSON, and then
deserialized by `deserialize`
@@ -92,11 +127,12 @@ class EventContext(object):
# the prev_state_ids, so if we're a state event we include the event
# id that we replaced in the state.
if event.is_state():
- prev_state_id = self.prev_state_ids.get((event.type, event.state_key))
+ prev_state_ids = yield self.get_prev_state_ids(store)
+ prev_state_id = prev_state_ids.get((event.type, event.state_key))
else:
prev_state_id = None
- return {
+ defer.returnValue({
"prev_state_id": prev_state_id,
"event_type": event.type,
"event_state_key": event.state_key if event.is_state() else None,
@@ -106,10 +142,9 @@ class EventContext(object):
"delta_ids": _encode_state_dict(self.delta_ids),
"prev_state_events": self.prev_state_events,
"app_service_id": self.app_service.id if self.app_service else None
- }
+ })
@staticmethod
- @defer.inlineCallbacks
def deserialize(store, input):
"""Converts a dict that was produced by `serialize` back into a
EventContext.
@@ -122,32 +157,115 @@ class EventContext(object):
EventContext
"""
context = EventContext()
+
+ # We use the state_group and prev_state_id stuff to pull the
+ # current_state_ids out of the DB and construct prev_state_ids.
+ context._prev_state_id = input["prev_state_id"]
+ context._event_type = input["event_type"]
+ context._event_state_key = input["event_state_key"]
+
+ context._current_state_ids = None
+ context._prev_state_ids = None
+ context._fetching_state_deferred = None
+
context.state_group = input["state_group"]
- context.rejected = input["rejected"]
context.prev_group = input["prev_group"]
context.delta_ids = _decode_state_dict(input["delta_ids"])
+
+ context.rejected = input["rejected"]
context.prev_state_events = input["prev_state_events"]
- # We use the state_group and prev_state_id stuff to pull the
- # current_state_ids out of the DB and construct prev_state_ids.
- prev_state_id = input["prev_state_id"]
- event_type = input["event_type"]
- event_state_key = input["event_state_key"]
+ app_service_id = input["app_service_id"]
+ if app_service_id:
+ context.app_service = store.get_app_service_by_id(app_service_id)
+
+ return context
+
+ @defer.inlineCallbacks
+ def get_current_state_ids(self, store):
+ """Gets the current state IDs
+
+ Returns:
+ Deferred[dict[(str, str), str]|None]: Returns None if state_group
+ is None, which happens when the associated event is an outlier.
+ """
+
+ if not self._fetching_state_deferred:
+ self._fetching_state_deferred = run_in_background(
+ self._fill_out_state, store,
+ )
+
+ yield make_deferred_yieldable(self._fetching_state_deferred)
+
+ defer.returnValue(self._current_state_ids)
+
+ @defer.inlineCallbacks
+ def get_prev_state_ids(self, store):
+ """Gets the prev state IDs
+
+ Returns:
+ Deferred[dict[(str, str), str]|None]: Returns None if state_group
+ is None, which happens when the associated event is an outlier.
+ """
+
+ if not self._fetching_state_deferred:
+ self._fetching_state_deferred = run_in_background(
+ self._fill_out_state, store,
+ )
+
+ yield make_deferred_yieldable(self._fetching_state_deferred)
- context.current_state_ids = yield store.get_state_ids_for_group(
- context.state_group,
+ defer.returnValue(self._prev_state_ids)
+
+ def get_cached_current_state_ids(self):
+ """Gets the current state IDs if we have them already cached.
+
+ Returns:
+ dict[(str, str), str]|None: Returns None if we haven't cached the
+ state or if state_group is None, which happens when the associated
+ event is an outlier.
+ """
+
+ return self._current_state_ids
+
+ @defer.inlineCallbacks
+ def _fill_out_state(self, store):
+ """Called to populate the _current_state_ids and _prev_state_ids
+ attributes by loading from the database.
+ """
+ if self.state_group is None:
+ return
+
+ self._current_state_ids = yield store.get_state_ids_for_group(
+ self.state_group,
)
- if prev_state_id and event_state_key:
- context.prev_state_ids = dict(context.current_state_ids)
- context.prev_state_ids[(event_type, event_state_key)] = prev_state_id
+ if self._prev_state_id and self._event_state_key is not None:
+ self._prev_state_ids = dict(self._current_state_ids)
+
+ key = (self._event_type, self._event_state_key)
+ self._prev_state_ids[key] = self._prev_state_id
else:
- context.prev_state_ids = context.current_state_ids
+ self._prev_state_ids = self._current_state_ids
- app_service_id = input["app_service_id"]
- if app_service_id:
- context.app_service = store.get_app_service_by_id(app_service_id)
+ @defer.inlineCallbacks
+ def update_state(self, state_group, prev_state_ids, current_state_ids,
+ prev_group, delta_ids):
+ """Replace the state in the context
+ """
+
+ # We need to make sure we wait for any ongoing fetching of state
+ # to complete so that the updated state doesn't get clobbered
+ if self._fetching_state_deferred:
+ yield make_deferred_yieldable(self._fetching_state_deferred)
+
+ self.state_group = state_group
+ self._prev_state_ids = prev_state_ids
+ self.prev_group = prev_group
+ self._current_state_ids = current_state_ids
+ self.delta_ids = delta_ids
- defer.returnValue(context)
+ # We need to ensure that that we've marked as having fetched the state
+ self._fetching_state_deferred = defer.succeed(None)
def _encode_state_dict(state_dict):
@@ -159,7 +277,7 @@ def _encode_state_dict(state_dict):
return [
(etype, state_key, v)
- for (etype, state_key), v in state_dict.iteritems()
+ for (etype, state_key), v in iteritems(state_dict)
]
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 48f26db67c..e501251b6e 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -24,6 +24,7 @@ from prometheus_client import Counter
from twisted.internet import defer
from twisted.internet.abstract import isIPAddress
+from twisted.python import failure
from synapse.api.constants import EventTypes
from synapse.api.errors import AuthError, FederationError, NotFoundError, SynapseError
@@ -186,8 +187,12 @@ class FederationServer(FederationBase):
logger.warn("Error handling PDU %s: %s", event_id, e)
pdu_results[event_id] = {"error": str(e)}
except Exception as e:
+ f = failure.Failure()
pdu_results[event_id] = {"error": str(e)}
- logger.exception("Failed to handle PDU %s", event_id)
+ logger.error(
+ "Failed to handle PDU %s: %s",
+ event_id, f.getTraceback().rstrip(),
+ )
yield async.concurrently_execute(
process_pdus_for_room, pdus_by_room.keys(),
@@ -203,8 +208,8 @@ class FederationServer(FederationBase):
)
pdu_failures = getattr(transaction, "pdu_failures", [])
- for failure in pdu_failures:
- logger.info("Got failure %r", failure)
+ for fail in pdu_failures:
+ logger.info("Got failure %r", fail)
response = {
"pdus": pdu_results,
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 5a956ecfb3..6996d6b695 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -30,7 +30,8 @@ from synapse.metrics import (
sent_edus_counter,
sent_transactions_counter,
)
-from synapse.util import PreserveLoggingContext, logcontext
+from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.util import logcontext
from synapse.util.metrics import measure_func
from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
@@ -165,10 +166,11 @@ class TransactionQueue(object):
if self._is_processing:
return
- # fire off a processing loop in the background. It's likely it will
- # outlast the current request, so run it in the sentinel logcontext.
- with PreserveLoggingContext():
- self._process_event_queue_loop()
+ # fire off a processing loop in the background
+ run_as_background_process(
+ "process_event_queue_for_federation",
+ self._process_event_queue_loop,
+ )
@defer.inlineCallbacks
def _process_event_queue_loop(self):
@@ -432,14 +434,11 @@ class TransactionQueue(object):
logger.debug("TX [%s] Starting transaction loop", destination)
- # Drop the logcontext before starting the transaction. It doesn't
- # really make sense to log all the outbound transactions against
- # whatever path led us to this point: that's pretty arbitrary really.
- #
- # (this also means we can fire off _perform_transaction without
- # yielding)
- with logcontext.PreserveLoggingContext():
- self._transaction_transmission_loop(destination)
+ run_as_background_process(
+ "federation_transaction_transmission_loop",
+ self._transaction_transmission_loop,
+ destination,
+ )
@defer.inlineCallbacks
def _transaction_transmission_loop(self, destination):
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index c9beca27c2..8574898f0c 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -404,10 +404,10 @@ class FederationMakeLeaveServlet(BaseFederationServlet):
class FederationSendLeaveServlet(BaseFederationServlet):
- PATH = "/send_leave/(?P<room_id>[^/]*)/(?P<txid>[^/]*)"
+ PATH = "/send_leave/(?P<room_id>[^/]*)/(?P<event_id>[^/]*)"
@defer.inlineCallbacks
- def on_PUT(self, origin, content, query, room_id, txid):
+ def on_PUT(self, origin, content, query, room_id, event_id):
content = yield self.handler.on_send_leave_request(origin, content)
defer.returnValue((200, content))
diff --git a/synapse/groups/attestations.py b/synapse/groups/attestations.py
index 47452700a8..b04f4234ca 100644
--- a/synapse/groups/attestations.py
+++ b/synapse/groups/attestations.py
@@ -43,6 +43,7 @@ from signedjson.sign import sign_json
from twisted.internet import defer
from synapse.api.errors import SynapseError
+from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import get_domain_from_id
from synapse.util.logcontext import run_in_background
@@ -129,7 +130,7 @@ class GroupAttestionRenewer(object):
self.attestations = hs.get_groups_attestation_signing()
self._renew_attestations_loop = self.clock.looping_call(
- self._renew_attestations, 30 * 60 * 1000,
+ self._start_renew_attestations, 30 * 60 * 1000,
)
@defer.inlineCallbacks
@@ -151,6 +152,9 @@ class GroupAttestionRenewer(object):
defer.returnValue({})
+ def _start_renew_attestations(self):
+ return run_as_background_process("renew_attestations", self._renew_attestations)
+
@defer.inlineCallbacks
def _renew_attestations(self):
"""Called periodically to check if we need to update any of our attestations
diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py
index 4b9923d8c0..413425fed1 100644
--- a/synapse/handlers/__init__.py
+++ b/synapse/handlers/__init__.py
@@ -17,9 +17,7 @@ from .admin import AdminHandler
from .directory import DirectoryHandler
from .federation import FederationHandler
from .identity import IdentityHandler
-from .message import MessageHandler
from .register import RegistrationHandler
-from .room import RoomContextHandler
from .search import SearchHandler
@@ -44,10 +42,8 @@ class Handlers(object):
def __init__(self, hs):
self.registration_handler = RegistrationHandler(hs)
- self.message_handler = MessageHandler(hs)
self.federation_handler = FederationHandler(hs)
self.directory_handler = DirectoryHandler(hs)
self.admin_handler = AdminHandler(hs)
self.identity_handler = IdentityHandler(hs)
self.search_handler = SearchHandler(hs)
- self.room_context_handler = RoomContextHandler(hs)
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index b6a8b3aa3b..704181d2d3 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -112,8 +112,9 @@ class BaseHandler(object):
guest_access = event.content.get("guest_access", "forbidden")
if guest_access != "can_join":
if context:
+ current_state_ids = yield context.get_current_state_ids(self.store)
current_state = yield self.store.get_events(
- list(context.current_state_ids.values())
+ list(current_state_ids.values())
)
else:
current_state = yield self.state_handler.get_current_state(
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index ec9fe01a5a..ee41aed69e 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -23,6 +23,7 @@ from twisted.internet import defer
import synapse
from synapse.api.constants import EventTypes
+from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
from synapse.util.metrics import Measure
@@ -106,7 +107,9 @@ class ApplicationServicesHandler(object):
yield self._check_user_exists(event.state_key)
if not self.started_scheduler:
- self.scheduler.start().addErrback(log_failure)
+ def start_scheduler():
+ return self.scheduler.start().addErrback(log_failure)
+ run_as_background_process("as_scheduler", start_scheduler)
self.started_scheduler = True
# Fork off pushes to these services
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index d3ecebd29f..49068c06d9 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -21,8 +21,8 @@ import logging
import sys
import six
-from six import iteritems
-from six.moves import http_client
+from six import iteritems, itervalues
+from six.moves import http_client, zip
from signedjson.key import decode_verify_key_bytes
from signedjson.sign import verify_signed_json
@@ -43,7 +43,6 @@ from synapse.crypto.event_signing import (
add_hashes_and_signatures,
compute_event_signature,
)
-from synapse.events.utils import prune_event
from synapse.events.validator import EventValidator
from synapse.state import resolve_events_with_factory
from synapse.types import UserID, get_domain_from_id
@@ -52,8 +51,8 @@ from synapse.util.async import Linearizer
from synapse.util.distributor import user_joined_room
from synapse.util.frozenutils import unfreeze
from synapse.util.logutils import log_function
-from synapse.util.metrics import measure_func
from synapse.util.retryutils import NotRetryingDestination
+from synapse.visibility import filter_events_for_server
from ._base import BaseHandler
@@ -487,7 +486,10 @@ class FederationHandler(BaseHandler):
# joined the room. Don't bother if the user is just
# changing their profile info.
newly_joined = True
- prev_state_id = context.prev_state_ids.get(
+
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
+
+ prev_state_id = prev_state_ids.get(
(event.type, event.state_key)
)
if prev_state_id:
@@ -501,137 +503,6 @@ class FederationHandler(BaseHandler):
user = UserID.from_string(event.state_key)
yield user_joined_room(self.distributor, user, event.room_id)
- @measure_func("_filter_events_for_server")
- @defer.inlineCallbacks
- def _filter_events_for_server(self, server_name, room_id, events):
- """Filter the given events for the given server, redacting those the
- server can't see.
-
- Assumes the server is currently in the room.
-
- Returns
- list[FrozenEvent]
- """
- # First lets check to see if all the events have a history visibility
- # of "shared" or "world_readable". If thats the case then we don't
- # need to check membership (as we know the server is in the room).
- event_to_state_ids = yield self.store.get_state_ids_for_events(
- frozenset(e.event_id for e in events),
- types=(
- (EventTypes.RoomHistoryVisibility, ""),
- )
- )
-
- visibility_ids = set()
- for sids in event_to_state_ids.itervalues():
- hist = sids.get((EventTypes.RoomHistoryVisibility, ""))
- if hist:
- visibility_ids.add(hist)
-
- # If we failed to find any history visibility events then the default
- # is "shared" visiblity.
- if not visibility_ids:
- defer.returnValue(events)
-
- event_map = yield self.store.get_events(visibility_ids)
- all_open = all(
- e.content.get("history_visibility") in (None, "shared", "world_readable")
- for e in event_map.itervalues()
- )
-
- if all_open:
- defer.returnValue(events)
-
- # Ok, so we're dealing with events that have non-trivial visibility
- # rules, so we need to also get the memberships of the room.
-
- event_to_state_ids = yield self.store.get_state_ids_for_events(
- frozenset(e.event_id for e in events),
- types=(
- (EventTypes.RoomHistoryVisibility, ""),
- (EventTypes.Member, None),
- )
- )
-
- # We only want to pull out member events that correspond to the
- # server's domain.
-
- def check_match(id):
- try:
- return server_name == get_domain_from_id(id)
- except Exception:
- return False
-
- # Parses mapping `event_id -> (type, state_key) -> state event_id`
- # to get all state ids that we're interested in.
- event_map = yield self.store.get_events([
- e_id
- for key_to_eid in list(event_to_state_ids.values())
- for key, e_id in key_to_eid.items()
- if key[0] != EventTypes.Member or check_match(key[1])
- ])
-
- event_to_state = {
- e_id: {
- key: event_map[inner_e_id]
- for key, inner_e_id in key_to_eid.iteritems()
- if inner_e_id in event_map
- }
- for e_id, key_to_eid in event_to_state_ids.iteritems()
- }
-
- erased_senders = yield self.store.are_users_erased(
- e.sender for e in events,
- )
-
- def redact_disallowed(event, state):
- # if the sender has been gdpr17ed, always return a redacted
- # copy of the event.
- if erased_senders[event.sender]:
- logger.info(
- "Sender of %s has been erased, redacting",
- event.event_id,
- )
- return prune_event(event)
-
- if not state:
- return event
-
- history = state.get((EventTypes.RoomHistoryVisibility, ''), None)
- if history:
- visibility = history.content.get("history_visibility", "shared")
- if visibility in ["invited", "joined"]:
- # We now loop through all state events looking for
- # membership states for the requesting server to determine
- # if the server is either in the room or has been invited
- # into the room.
- for ev in state.itervalues():
- if ev.type != EventTypes.Member:
- continue
- try:
- domain = get_domain_from_id(ev.state_key)
- except Exception:
- continue
-
- if domain != server_name:
- continue
-
- memtype = ev.membership
- if memtype == Membership.JOIN:
- return event
- elif memtype == Membership.INVITE:
- if visibility == "invited":
- return event
- else:
- return prune_event(event)
-
- return event
-
- defer.returnValue([
- redact_disallowed(e, event_to_state[e.event_id])
- for e in events
- ])
-
@log_function
@defer.inlineCallbacks
def backfill(self, dest, room_id, limit, extremities):
@@ -863,7 +734,7 @@ class FederationHandler(BaseHandler):
"""
joined_users = [
(state_key, int(event.depth))
- for (e_type, state_key), event in state.iteritems()
+ for (e_type, state_key), event in iteritems(state)
if e_type == EventTypes.Member
and event.membership == Membership.JOIN
]
@@ -880,7 +751,7 @@ class FederationHandler(BaseHandler):
except Exception:
pass
- return sorted(joined_domains.iteritems(), key=lambda d: d[1])
+ return sorted(joined_domains.items(), key=lambda d: d[1])
curr_domains = get_domains_from_state(curr_state)
@@ -943,7 +814,7 @@ class FederationHandler(BaseHandler):
tried_domains = set(likely_domains)
tried_domains.add(self.server_name)
- event_ids = list(extremities.iterkeys())
+ event_ids = list(extremities.keys())
logger.debug("calling resolve_state_groups in _maybe_backfill")
resolve = logcontext.preserve_fn(
@@ -959,15 +830,15 @@ class FederationHandler(BaseHandler):
states = dict(zip(event_ids, [s.state for s in states]))
state_map = yield self.store.get_events(
- [e_id for ids in states.itervalues() for e_id in ids.itervalues()],
+ [e_id for ids in itervalues(states) for e_id in itervalues(ids)],
get_prev_content=False
)
states = {
key: {
k: state_map[e_id]
- for k, e_id in state_dict.iteritems()
+ for k, e_id in iteritems(state_dict)
if e_id in state_map
- } for key, state_dict in states.iteritems()
+ } for key, state_dict in iteritems(states)
}
for e_id, _ in sorted_extremeties_tuple:
@@ -1038,16 +909,6 @@ class FederationHandler(BaseHandler):
[auth_id for auth_id, _ in event.auth_events],
include_given=True
)
-
- for event in auth:
- event.signatures.update(
- compute_event_signature(
- event,
- self.hs.hostname,
- self.hs.config.signing_key[0]
- )
- )
-
defer.returnValue([e for e in auth])
@log_function
@@ -1248,10 +1109,12 @@ class FederationHandler(BaseHandler):
user = UserID.from_string(event.state_key)
yield user_joined_room(self.distributor, user, event.room_id)
- state_ids = list(context.prev_state_ids.values())
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
+
+ state_ids = list(prev_state_ids.values())
auth_chain = yield self.store.get_auth_chain(state_ids)
- state = yield self.store.get_events(list(context.prev_state_ids.values()))
+ state = yield self.store.get_events(list(prev_state_ids.values()))
defer.returnValue({
"state": list(state.values()),
@@ -1416,7 +1279,7 @@ class FederationHandler(BaseHandler):
@log_function
def on_make_leave_request(self, room_id, user_id):
""" We've received a /make_leave/ request, so we create a partial
- join event for the room and return that. We do *not* persist or
+ leave event for the room and return that. We do *not* persist or
process it until the other server has signed it and sent it back.
"""
builder = self.event_builder_factory.new({
@@ -1503,18 +1366,6 @@ class FederationHandler(BaseHandler):
del results[(event.type, event.state_key)]
res = list(results.values())
- for event in res:
- # We sign these again because there was a bug where we
- # incorrectly signed things the first time round
- if self.is_mine_id(event.event_id):
- event.signatures.update(
- compute_event_signature(
- event,
- self.hs.hostname,
- self.hs.config.signing_key[0]
- )
- )
-
defer.returnValue(res)
else:
defer.returnValue([])
@@ -1558,7 +1409,7 @@ class FederationHandler(BaseHandler):
limit
)
- events = yield self._filter_events_for_server(origin, room_id, events)
+ events = yield filter_events_for_server(self.store, origin, events)
defer.returnValue(events)
@@ -1586,18 +1437,6 @@ class FederationHandler(BaseHandler):
)
if event:
- if self.is_mine_id(event.event_id):
- # FIXME: This is a temporary work around where we occasionally
- # return events slightly differently than when they were
- # originally signed
- event.signatures.update(
- compute_event_signature(
- event,
- self.hs.hostname,
- self.hs.config.signing_key[0]
- )
- )
-
in_room = yield self.auth.check_host_in_room(
event.room_id,
origin
@@ -1605,8 +1444,8 @@ class FederationHandler(BaseHandler):
if not in_room:
raise AuthError(403, "Host not in room.")
- events = yield self._filter_events_for_server(
- origin, event.room_id, [event]
+ events = yield filter_events_for_server(
+ self.store, origin, [event],
)
event = events[0]
defer.returnValue(event)
@@ -1681,7 +1520,7 @@ class FederationHandler(BaseHandler):
yield self.store.persist_events(
[
(ev_info["event"], context)
- for ev_info, context in itertools.izip(event_infos, contexts)
+ for ev_info, context in zip(event_infos, contexts)
],
backfilled=backfilled,
)
@@ -1801,8 +1640,9 @@ class FederationHandler(BaseHandler):
)
if not auth_events:
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
auth_events_ids = yield self.auth.compute_auth_events(
- event, context.prev_state_ids, for_verification=True,
+ event, prev_state_ids, for_verification=True,
)
auth_events = yield self.store.get_events(auth_events_ids)
auth_events = {
@@ -1862,15 +1702,6 @@ class FederationHandler(BaseHandler):
local_auth_chain, remote_auth_chain
)
- for event in ret["auth_chain"]:
- event.signatures.update(
- compute_event_signature(
- event,
- self.hs.hostname,
- self.hs.config.signing_key[0]
- )
- )
-
logger.debug("on_query_auth returning: %s", ret)
defer.returnValue(ret)
@@ -1896,8 +1727,8 @@ class FederationHandler(BaseHandler):
min_depth=min_depth,
)
- missing_events = yield self._filter_events_for_server(
- origin, room_id, missing_events,
+ missing_events = yield filter_events_for_server(
+ self.store, origin, missing_events,
)
defer.returnValue(missing_events)
@@ -2051,9 +1882,10 @@ class FederationHandler(BaseHandler):
break
if do_resolution:
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
# 1. Get what we think is the auth chain.
auth_ids = yield self.auth.compute_auth_events(
- event, context.prev_state_ids
+ event, prev_state_ids
)
local_auth_chain = yield self.store.get_auth_chain(
auth_ids, include_given=True
@@ -2143,21 +1975,34 @@ class FederationHandler(BaseHandler):
k: a.event_id for k, a in iteritems(auth_events)
if k != event_key
}
- context.current_state_ids = dict(context.current_state_ids)
- context.current_state_ids.update(state_updates)
- if context.delta_ids is not None:
- context.delta_ids = dict(context.delta_ids)
- context.delta_ids.update(state_updates)
- context.prev_state_ids = dict(context.prev_state_ids)
- context.prev_state_ids.update({
+ current_state_ids = yield context.get_current_state_ids(self.store)
+ current_state_ids = dict(current_state_ids)
+
+ current_state_ids.update(state_updates)
+
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
+ prev_state_ids = dict(prev_state_ids)
+
+ prev_state_ids.update({
k: a.event_id for k, a in iteritems(auth_events)
})
- context.state_group = yield self.store.store_state_group(
+
+ # create a new state group as a delta from the existing one.
+ prev_group = context.state_group
+ state_group = yield self.store.store_state_group(
event.event_id,
event.room_id,
- prev_group=context.prev_group,
- delta_ids=context.delta_ids,
- current_state_ids=context.current_state_ids,
+ prev_group=prev_group,
+ delta_ids=state_updates,
+ current_state_ids=current_state_ids,
+ )
+
+ yield context.update_state(
+ state_group=state_group,
+ current_state_ids=current_state_ids,
+ prev_state_ids=prev_state_ids,
+ prev_group=prev_group,
+ delta_ids=state_updates,
)
@defer.inlineCallbacks
@@ -2397,7 +2242,8 @@ class FederationHandler(BaseHandler):
event.content["third_party_invite"]["signed"]["token"]
)
original_invite = None
- original_invite_id = context.prev_state_ids.get(key)
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
+ original_invite_id = prev_state_ids.get(key)
if original_invite_id:
original_invite = yield self.store.get_event(
original_invite_id, allow_none=True
@@ -2439,7 +2285,8 @@ class FederationHandler(BaseHandler):
signed = event.content["third_party_invite"]["signed"]
token = signed["token"]
- invite_event_id = context.prev_state_ids.get(
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
+ invite_event_id = prev_state_ids.get(
(EventTypes.ThirdPartyInvite, token,)
)
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index fb11716eb8..40e7580a61 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -148,13 +148,15 @@ class InitialSyncHandler(BaseHandler):
try:
if event.membership == Membership.JOIN:
room_end_token = now_token.room_key
- deferred_room_state = self.state_handler.get_current_state(
- event.room_id
+ deferred_room_state = run_in_background(
+ self.state_handler.get_current_state,
+ event.room_id,
)
elif event.membership == Membership.LEAVE:
room_end_token = "s%d" % (event.stream_ordering,)
- deferred_room_state = self.store.get_state_for_events(
- [event.event_id], None
+ deferred_room_state = run_in_background(
+ self.store.get_state_for_events,
+ [event.event_id], None,
)
deferred_room_state.addCallback(
lambda states: states[event.event_id]
@@ -387,19 +389,21 @@ class InitialSyncHandler(BaseHandler):
receipts = []
defer.returnValue(receipts)
- presence, receipts, (messages, token) = yield defer.gatherResults(
- [
- run_in_background(get_presence),
- run_in_background(get_receipts),
- run_in_background(
- self.store.get_recent_events_for_room,
- room_id,
- limit=limit,
- end_token=now_token.room_key,
- )
- ],
- consumeErrors=True,
- ).addErrback(unwrapFirstError)
+ presence, receipts, (messages, token) = yield make_deferred_yieldable(
+ defer.gatherResults(
+ [
+ run_in_background(get_presence),
+ run_in_background(get_receipts),
+ run_in_background(
+ self.store.get_recent_events_for_room,
+ room_id,
+ limit=limit,
+ end_token=now_token.room_key,
+ )
+ ],
+ consumeErrors=True,
+ ).addErrback(unwrapFirstError),
+ )
messages = yield filter_events_for_client(
self.store, user_id, messages, is_peeking=is_peeking,
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index a39b852ceb..39d7724778 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -23,7 +23,6 @@ from canonicaljson import encode_canonical_json, json
from twisted.internet import defer
from twisted.internet.defer import succeed
-from twisted.python.failure import Failure
from synapse.api.constants import MAX_DEPTH, EventTypes, Membership
from synapse.api.errors import AuthError, Codes, ConsentNotGivenError, SynapseError
@@ -32,247 +31,26 @@ from synapse.crypto.event_signing import add_hashes_and_signatures
from synapse.events.utils import serialize_event
from synapse.events.validator import EventValidator
from synapse.replication.http.send_event import send_event_to_master
-from synapse.types import RoomAlias, RoomStreamToken, UserID
-from synapse.util.async import Limiter, ReadWriteLock
+from synapse.types import RoomAlias, UserID
+from synapse.util.async import Linearizer
from synapse.util.frozenutils import frozendict_json_encoder
from synapse.util.logcontext import run_in_background
from synapse.util.metrics import measure_func
-from synapse.util.stringutils import random_string
-from synapse.visibility import filter_events_for_client
from ._base import BaseHandler
logger = logging.getLogger(__name__)
-class PurgeStatus(object):
- """Object tracking the status of a purge request
-
- This class contains information on the progress of a purge request, for
- return by get_purge_status.
-
- Attributes:
- status (int): Tracks whether this request has completed. One of
- STATUS_{ACTIVE,COMPLETE,FAILED}
+class MessageHandler(object):
+ """Contains some read only APIs to get state about a room
"""
- STATUS_ACTIVE = 0
- STATUS_COMPLETE = 1
- STATUS_FAILED = 2
-
- STATUS_TEXT = {
- STATUS_ACTIVE: "active",
- STATUS_COMPLETE: "complete",
- STATUS_FAILED: "failed",
- }
-
- def __init__(self):
- self.status = PurgeStatus.STATUS_ACTIVE
-
- def asdict(self):
- return {
- "status": PurgeStatus.STATUS_TEXT[self.status]
- }
-
-
-class MessageHandler(BaseHandler):
-
def __init__(self, hs):
- super(MessageHandler, self).__init__(hs)
- self.hs = hs
- self.state = hs.get_state_handler()
+ self.auth = hs.get_auth()
self.clock = hs.get_clock()
-
- self.pagination_lock = ReadWriteLock()
- self._purges_in_progress_by_room = set()
- # map from purge id to PurgeStatus
- self._purges_by_id = {}
-
- def start_purge_history(self, room_id, token,
- delete_local_events=False):
- """Start off a history purge on a room.
-
- Args:
- room_id (str): The room to purge from
-
- token (str): topological token to delete events before
- delete_local_events (bool): True to delete local events as well as
- remote ones
-
- Returns:
- str: unique ID for this purge transaction.
- """
- if room_id in self._purges_in_progress_by_room:
- raise SynapseError(
- 400,
- "History purge already in progress for %s" % (room_id, ),
- )
-
- purge_id = random_string(16)
-
- # we log the purge_id here so that it can be tied back to the
- # request id in the log lines.
- logger.info("[purge] starting purge_id %s", purge_id)
-
- self._purges_by_id[purge_id] = PurgeStatus()
- run_in_background(
- self._purge_history,
- purge_id, room_id, token, delete_local_events,
- )
- return purge_id
-
- @defer.inlineCallbacks
- def _purge_history(self, purge_id, room_id, token,
- delete_local_events):
- """Carry out a history purge on a room.
-
- Args:
- purge_id (str): The id for this purge
- room_id (str): The room to purge from
- token (str): topological token to delete events before
- delete_local_events (bool): True to delete local events as well as
- remote ones
-
- Returns:
- Deferred
- """
- self._purges_in_progress_by_room.add(room_id)
- try:
- with (yield self.pagination_lock.write(room_id)):
- yield self.store.purge_history(
- room_id, token, delete_local_events,
- )
- logger.info("[purge] complete")
- self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE
- except Exception:
- logger.error("[purge] failed: %s", Failure().getTraceback().rstrip())
- self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED
- finally:
- self._purges_in_progress_by_room.discard(room_id)
-
- # remove the purge from the list 24 hours after it completes
- def clear_purge():
- del self._purges_by_id[purge_id]
- self.hs.get_reactor().callLater(24 * 3600, clear_purge)
-
- def get_purge_status(self, purge_id):
- """Get the current status of an active purge
-
- Args:
- purge_id (str): purge_id returned by start_purge_history
-
- Returns:
- PurgeStatus|None
- """
- return self._purges_by_id.get(purge_id)
-
- @defer.inlineCallbacks
- def get_messages(self, requester, room_id=None, pagin_config=None,
- as_client_event=True, event_filter=None):
- """Get messages in a room.
-
- Args:
- requester (Requester): The user requesting messages.
- room_id (str): The room they want messages from.
- pagin_config (synapse.api.streams.PaginationConfig): The pagination
- config rules to apply, if any.
- as_client_event (bool): True to get events in client-server format.
- event_filter (Filter): Filter to apply to results or None
- Returns:
- dict: Pagination API results
- """
- user_id = requester.user.to_string()
-
- if pagin_config.from_token:
- room_token = pagin_config.from_token.room_key
- else:
- pagin_config.from_token = (
- yield self.hs.get_event_sources().get_current_token_for_room(
- room_id=room_id
- )
- )
- room_token = pagin_config.from_token.room_key
-
- room_token = RoomStreamToken.parse(room_token)
-
- pagin_config.from_token = pagin_config.from_token.copy_and_replace(
- "room_key", str(room_token)
- )
-
- source_config = pagin_config.get_source_config("room")
-
- with (yield self.pagination_lock.read(room_id)):
- membership, member_event_id = yield self._check_in_room_or_world_readable(
- room_id, user_id
- )
-
- if source_config.direction == 'b':
- # if we're going backwards, we might need to backfill. This
- # requires that we have a topo token.
- if room_token.topological:
- max_topo = room_token.topological
- else:
- max_topo = yield self.store.get_max_topological_token(
- room_id, room_token.stream
- )
-
- if membership == Membership.LEAVE:
- # If they have left the room then clamp the token to be before
- # they left the room, to save the effort of loading from the
- # database.
- leave_token = yield self.store.get_topological_token_for_event(
- member_event_id
- )
- leave_token = RoomStreamToken.parse(leave_token)
- if leave_token.topological < max_topo:
- source_config.from_key = str(leave_token)
-
- yield self.hs.get_handlers().federation_handler.maybe_backfill(
- room_id, max_topo
- )
-
- events, next_key = yield self.store.paginate_room_events(
- room_id=room_id,
- from_key=source_config.from_key,
- to_key=source_config.to_key,
- direction=source_config.direction,
- limit=source_config.limit,
- event_filter=event_filter,
- )
-
- next_token = pagin_config.from_token.copy_and_replace(
- "room_key", next_key
- )
-
- if not events:
- defer.returnValue({
- "chunk": [],
- "start": pagin_config.from_token.to_string(),
- "end": next_token.to_string(),
- })
-
- if event_filter:
- events = event_filter.filter(events)
-
- events = yield filter_events_for_client(
- self.store,
- user_id,
- events,
- is_peeking=(member_event_id is None),
- )
-
- time_now = self.clock.time_msec()
-
- chunk = {
- "chunk": [
- serialize_event(e, time_now, as_client_event)
- for e in events
- ],
- "start": pagin_config.from_token.to_string(),
- "end": next_token.to_string(),
- }
-
- defer.returnValue(chunk)
+ self.state = hs.get_state_handler()
+ self.store = hs.get_datastore()
@defer.inlineCallbacks
def get_room_data(self, user_id=None, room_id=None,
@@ -286,12 +64,12 @@ class MessageHandler(BaseHandler):
Raises:
SynapseError if something went wrong.
"""
- membership, membership_event_id = yield self._check_in_room_or_world_readable(
+ membership, membership_event_id = yield self.auth.check_in_room_or_world_readable(
room_id, user_id
)
if membership == Membership.JOIN:
- data = yield self.state_handler.get_current_state(
+ data = yield self.state.get_current_state(
room_id, event_type, state_key
)
elif membership == Membership.LEAVE:
@@ -304,31 +82,6 @@ class MessageHandler(BaseHandler):
defer.returnValue(data)
@defer.inlineCallbacks
- def _check_in_room_or_world_readable(self, room_id, user_id):
- try:
- # check_user_was_in_room will return the most recent membership
- # event for the user if:
- # * The user is a non-guest user, and was ever in the room
- # * The user is a guest user, and has joined the room
- # else it will throw.
- member_event = yield self.auth.check_user_was_in_room(room_id, user_id)
- defer.returnValue((member_event.membership, member_event.event_id))
- return
- except AuthError:
- visibility = yield self.state_handler.get_current_state(
- room_id, EventTypes.RoomHistoryVisibility, ""
- )
- if (
- visibility and
- visibility.content["history_visibility"] == "world_readable"
- ):
- defer.returnValue((Membership.JOIN, None))
- return
- raise AuthError(
- 403, "Guest access not allowed", errcode=Codes.GUEST_ACCESS_FORBIDDEN
- )
-
- @defer.inlineCallbacks
def get_state_events(self, user_id, room_id, is_guest=False):
"""Retrieve all state events for a given room. If the user is
joined to the room then return the current state. If the user has
@@ -340,12 +93,12 @@ class MessageHandler(BaseHandler):
Returns:
A list of dicts representing state events. [{}, {}, {}]
"""
- membership, membership_event_id = yield self._check_in_room_or_world_readable(
+ membership, membership_event_id = yield self.auth.check_in_room_or_world_readable(
room_id, user_id
)
if membership == Membership.JOIN:
- room_state = yield self.state_handler.get_current_state(room_id)
+ room_state = yield self.state.get_current_state(room_id)
elif membership == Membership.LEAVE:
room_state = yield self.store.get_state_for_events(
[membership_event_id], None
@@ -373,7 +126,7 @@ class MessageHandler(BaseHandler):
if not requester.app_service:
# We check AS auth after fetching the room membership, as it
# requires us to pull out all joined members anyway.
- membership, _ = yield self._check_in_room_or_world_readable(
+ membership, _ = yield self.auth.check_in_room_or_world_readable(
room_id, user_id
)
if membership != Membership.JOIN:
@@ -427,7 +180,7 @@ class EventCreationHandler(object):
# We arbitrarily limit concurrent event creation for a room to 5.
# This is to stop us from diverging history *too* much.
- self.limiter = Limiter(max_count=5)
+ self.limiter = Linearizer(max_count=5, name="room_event_creation_limit")
self.action_generator = hs.get_action_generator()
@@ -630,7 +383,8 @@ class EventCreationHandler(object):
If so, returns the version of the event in context.
Otherwise, returns None.
"""
- prev_event_id = context.prev_state_ids.get((event.type, event.state_key))
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
+ prev_event_id = prev_state_ids.get((event.type, event.state_key))
prev_event = yield self.store.get_event(prev_event_id, allow_none=True)
if not prev_event:
return
@@ -752,8 +506,8 @@ class EventCreationHandler(object):
event = builder.build()
logger.debug(
- "Created event %s with state: %s",
- event.event_id, context.prev_state_ids,
+ "Created event %s",
+ event.event_id,
)
defer.returnValue(
@@ -806,8 +560,9 @@ class EventCreationHandler(object):
# If we're a worker we need to hit out to the master.
if self.config.worker_app:
yield send_event_to_master(
- self.hs.get_clock(),
- self.http_client,
+ clock=self.hs.get_clock(),
+ store=self.store,
+ client=self.http_client,
host=self.config.worker_replication_host,
port=self.config.worker_replication_http_port,
requester=requester,
@@ -884,9 +639,11 @@ class EventCreationHandler(object):
e.sender == event.sender
)
+ current_state_ids = yield context.get_current_state_ids(self.store)
+
state_to_include_ids = [
e_id
- for k, e_id in iteritems(context.current_state_ids)
+ for k, e_id in iteritems(current_state_ids)
if k[0] in self.hs.config.room_invite_state_types
or k == (EventTypes.Member, event.sender)
]
@@ -922,8 +679,9 @@ class EventCreationHandler(object):
)
if event.type == EventTypes.Redaction:
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
auth_events_ids = yield self.auth.compute_auth_events(
- event, context.prev_state_ids, for_verification=True,
+ event, prev_state_ids, for_verification=True,
)
auth_events = yield self.store.get_events(auth_events_ids)
auth_events = {
@@ -943,11 +701,13 @@ class EventCreationHandler(object):
"You don't have permission to redact events"
)
- if event.type == EventTypes.Create and context.prev_state_ids:
- raise AuthError(
- 403,
- "Changing the room create event is forbidden",
- )
+ if event.type == EventTypes.Create:
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
+ if prev_state_ids:
+ raise AuthError(
+ 403,
+ "Changing the room create event is forbidden",
+ )
(event_stream_id, max_stream_id) = yield self.store.persist_event(
event, context=context
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
new file mode 100644
index 0000000000..b2849783ed
--- /dev/null
+++ b/synapse/handlers/pagination.py
@@ -0,0 +1,265 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014 - 2016 OpenMarket Ltd
+# Copyright 2017 - 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+
+from twisted.internet import defer
+from twisted.python.failure import Failure
+
+from synapse.api.constants import Membership
+from synapse.api.errors import SynapseError
+from synapse.events.utils import serialize_event
+from synapse.types import RoomStreamToken
+from synapse.util.async import ReadWriteLock
+from synapse.util.logcontext import run_in_background
+from synapse.util.stringutils import random_string
+from synapse.visibility import filter_events_for_client
+
+logger = logging.getLogger(__name__)
+
+
+class PurgeStatus(object):
+ """Object tracking the status of a purge request
+
+ This class contains information on the progress of a purge request, for
+ return by get_purge_status.
+
+ Attributes:
+ status (int): Tracks whether this request has completed. One of
+ STATUS_{ACTIVE,COMPLETE,FAILED}
+ """
+
+ STATUS_ACTIVE = 0
+ STATUS_COMPLETE = 1
+ STATUS_FAILED = 2
+
+ STATUS_TEXT = {
+ STATUS_ACTIVE: "active",
+ STATUS_COMPLETE: "complete",
+ STATUS_FAILED: "failed",
+ }
+
+ def __init__(self):
+ self.status = PurgeStatus.STATUS_ACTIVE
+
+ def asdict(self):
+ return {
+ "status": PurgeStatus.STATUS_TEXT[self.status]
+ }
+
+
+class PaginationHandler(object):
+ """Handles pagination and purge history requests.
+
+ These are in the same handler due to the fact we need to block clients
+ paginating during a purge.
+ """
+
+ def __init__(self, hs):
+ self.hs = hs
+ self.auth = hs.get_auth()
+ self.store = hs.get_datastore()
+ self.clock = hs.get_clock()
+
+ self.pagination_lock = ReadWriteLock()
+ self._purges_in_progress_by_room = set()
+ # map from purge id to PurgeStatus
+ self._purges_by_id = {}
+
+ def start_purge_history(self, room_id, token,
+ delete_local_events=False):
+ """Start off a history purge on a room.
+
+ Args:
+ room_id (str): The room to purge from
+
+ token (str): topological token to delete events before
+ delete_local_events (bool): True to delete local events as well as
+ remote ones
+
+ Returns:
+ str: unique ID for this purge transaction.
+ """
+ if room_id in self._purges_in_progress_by_room:
+ raise SynapseError(
+ 400,
+ "History purge already in progress for %s" % (room_id, ),
+ )
+
+ purge_id = random_string(16)
+
+ # we log the purge_id here so that it can be tied back to the
+ # request id in the log lines.
+ logger.info("[purge] starting purge_id %s", purge_id)
+
+ self._purges_by_id[purge_id] = PurgeStatus()
+ run_in_background(
+ self._purge_history,
+ purge_id, room_id, token, delete_local_events,
+ )
+ return purge_id
+
+ @defer.inlineCallbacks
+ def _purge_history(self, purge_id, room_id, token,
+ delete_local_events):
+ """Carry out a history purge on a room.
+
+ Args:
+ purge_id (str): The id for this purge
+ room_id (str): The room to purge from
+ token (str): topological token to delete events before
+ delete_local_events (bool): True to delete local events as well as
+ remote ones
+
+ Returns:
+ Deferred
+ """
+ self._purges_in_progress_by_room.add(room_id)
+ try:
+ with (yield self.pagination_lock.write(room_id)):
+ yield self.store.purge_history(
+ room_id, token, delete_local_events,
+ )
+ logger.info("[purge] complete")
+ self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE
+ except Exception:
+ logger.error("[purge] failed: %s", Failure().getTraceback().rstrip())
+ self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED
+ finally:
+ self._purges_in_progress_by_room.discard(room_id)
+
+ # remove the purge from the list 24 hours after it completes
+ def clear_purge():
+ del self._purges_by_id[purge_id]
+ self.hs.get_reactor().callLater(24 * 3600, clear_purge)
+
+ def get_purge_status(self, purge_id):
+ """Get the current status of an active purge
+
+ Args:
+ purge_id (str): purge_id returned by start_purge_history
+
+ Returns:
+ PurgeStatus|None
+ """
+ return self._purges_by_id.get(purge_id)
+
+ @defer.inlineCallbacks
+ def get_messages(self, requester, room_id=None, pagin_config=None,
+ as_client_event=True, event_filter=None):
+ """Get messages in a room.
+
+ Args:
+ requester (Requester): The user requesting messages.
+ room_id (str): The room they want messages from.
+ pagin_config (synapse.api.streams.PaginationConfig): The pagination
+ config rules to apply, if any.
+ as_client_event (bool): True to get events in client-server format.
+ event_filter (Filter): Filter to apply to results or None
+ Returns:
+ dict: Pagination API results
+ """
+ user_id = requester.user.to_string()
+
+ if pagin_config.from_token:
+ room_token = pagin_config.from_token.room_key
+ else:
+ pagin_config.from_token = (
+ yield self.hs.get_event_sources().get_current_token_for_room(
+ room_id=room_id
+ )
+ )
+ room_token = pagin_config.from_token.room_key
+
+ room_token = RoomStreamToken.parse(room_token)
+
+ pagin_config.from_token = pagin_config.from_token.copy_and_replace(
+ "room_key", str(room_token)
+ )
+
+ source_config = pagin_config.get_source_config("room")
+
+ with (yield self.pagination_lock.read(room_id)):
+ membership, member_event_id = yield self.auth.check_in_room_or_world_readable(
+ room_id, user_id
+ )
+
+ if source_config.direction == 'b':
+ # if we're going backwards, we might need to backfill. This
+ # requires that we have a topo token.
+ if room_token.topological:
+ max_topo = room_token.topological
+ else:
+ max_topo = yield self.store.get_max_topological_token(
+ room_id, room_token.stream
+ )
+
+ if membership == Membership.LEAVE:
+ # If they have left the room then clamp the token to be before
+ # they left the room, to save the effort of loading from the
+ # database.
+ leave_token = yield self.store.get_topological_token_for_event(
+ member_event_id
+ )
+ leave_token = RoomStreamToken.parse(leave_token)
+ if leave_token.topological < max_topo:
+ source_config.from_key = str(leave_token)
+
+ yield self.hs.get_handlers().federation_handler.maybe_backfill(
+ room_id, max_topo
+ )
+
+ events, next_key = yield self.store.paginate_room_events(
+ room_id=room_id,
+ from_key=source_config.from_key,
+ to_key=source_config.to_key,
+ direction=source_config.direction,
+ limit=source_config.limit,
+ event_filter=event_filter,
+ )
+
+ next_token = pagin_config.from_token.copy_and_replace(
+ "room_key", next_key
+ )
+
+ if not events:
+ defer.returnValue({
+ "chunk": [],
+ "start": pagin_config.from_token.to_string(),
+ "end": next_token.to_string(),
+ })
+
+ if event_filter:
+ events = event_filter.filter(events)
+
+ events = yield filter_events_for_client(
+ self.store,
+ user_id,
+ events,
+ is_peeking=(member_event_id is None),
+ )
+
+ time_now = self.clock.time_msec()
+
+ chunk = {
+ "chunk": [
+ serialize_event(e, time_now, as_client_event)
+ for e in events
+ ],
+ "start": pagin_config.from_token.to_string(),
+ "end": next_token.to_string(),
+ }
+
+ defer.returnValue(chunk)
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index 859f6d2b2e..cb5c6d587e 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -18,6 +18,7 @@ import logging
from twisted.internet import defer
from synapse.api.errors import AuthError, CodeMessageException, SynapseError
+from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import UserID, get_domain_from_id
from ._base import BaseHandler
@@ -41,7 +42,7 @@ class ProfileHandler(BaseHandler):
if hs.config.worker_app is None:
self.clock.looping_call(
- self._update_remote_profile_cache, self.PROFILE_UPDATE_MS,
+ self._start_update_remote_profile_cache, self.PROFILE_UPDATE_MS,
)
@defer.inlineCallbacks
@@ -254,6 +255,12 @@ class ProfileHandler(BaseHandler):
room_id, str(e.message)
)
+ def _start_update_remote_profile_cache(self):
+ return run_as_background_process(
+ "Update remote profile", self._update_remote_profile_cache,
+ )
+
+ @defer.inlineCallbacks
def _update_remote_profile_cache(self):
"""Called periodically to check profiles of remote users we haven't
checked in a while.
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index f67512078b..003b848c00 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -24,7 +24,7 @@ from twisted.internet import defer
from synapse.api.constants import EventTypes, JoinRules, RoomCreationPreset
from synapse.api.errors import AuthError, Codes, StoreError, SynapseError
-from synapse.types import RoomAlias, RoomID, RoomStreamToken, UserID
+from synapse.types import RoomAlias, RoomID, RoomStreamToken, StreamToken, UserID
from synapse.util import stringutils
from synapse.visibility import filter_events_for_client
@@ -395,7 +395,11 @@ class RoomCreationHandler(BaseHandler):
)
-class RoomContextHandler(BaseHandler):
+class RoomContextHandler(object):
+ def __init__(self, hs):
+ self.hs = hs
+ self.store = hs.get_datastore()
+
@defer.inlineCallbacks
def get_event_context(self, user, room_id, event_id, limit):
"""Retrieves events, pagination tokens and state around a given event
@@ -414,8 +418,6 @@ class RoomContextHandler(BaseHandler):
before_limit = math.floor(limit / 2.)
after_limit = limit - before_limit
- now_token = yield self.hs.get_event_sources().get_current_token()
-
users = yield self.store.get_users_in_room(room_id)
is_peeking = user.to_string() not in users
@@ -458,11 +460,15 @@ class RoomContextHandler(BaseHandler):
)
results["state"] = list(state[last_event_id].values())
- results["start"] = now_token.copy_and_replace(
+ # We use a dummy token here as we only care about the room portion of
+ # the token, which we replace.
+ token = StreamToken.START
+
+ results["start"] = token.copy_and_replace(
"room_key", results["start"]
).to_string()
- results["end"] = now_token.copy_and_replace(
+ results["end"] = token.copy_and_replace(
"room_key", results["end"]
).to_string()
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 00f2e279bc..a832d91809 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -201,7 +201,9 @@ class RoomMemberHandler(object):
ratelimit=ratelimit,
)
- prev_member_event_id = context.prev_state_ids.get(
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
+
+ prev_member_event_id = prev_state_ids.get(
(EventTypes.Member, target.to_string()),
None
)
@@ -496,9 +498,10 @@ class RoomMemberHandler(object):
if prev_event is not None:
return
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
if event.membership == Membership.JOIN:
if requester.is_guest:
- guest_can_join = yield self._can_guest_join(context.prev_state_ids)
+ guest_can_join = yield self._can_guest_join(prev_state_ids)
if not guest_can_join:
# This should be an auth check, but guests are a local concept,
# so don't really fit into the general auth process.
@@ -517,7 +520,7 @@ class RoomMemberHandler(object):
ratelimit=ratelimit,
)
- prev_member_event_id = context.prev_state_ids.get(
+ prev_member_event_id = prev_state_ids.get(
(EventTypes.Member, event.state_key),
None
)
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index c24e35362a..dff1f67dcb 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
-# Copyright 2015 - 2016 OpenMarket Ltd
+# Copyright 2015, 2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -25,6 +26,8 @@ from synapse.api.constants import EventTypes, Membership
from synapse.push.clientformat import format_push_rules_for_user
from synapse.types import RoomStreamToken
from synapse.util.async import concurrently_execute
+from synapse.util.caches.expiringcache import ExpiringCache
+from synapse.util.caches.lrucache import LruCache
from synapse.util.caches.response_cache import ResponseCache
from synapse.util.logcontext import LoggingContext
from synapse.util.metrics import Measure, measure_func
@@ -32,6 +35,14 @@ from synapse.visibility import filter_events_for_client
logger = logging.getLogger(__name__)
+# Store the cache that tracks which lazy-loaded members have been sent to a given
+# client for no more than 30 minutes.
+LAZY_LOADED_MEMBERS_CACHE_MAX_AGE = 30 * 60 * 1000
+
+# Remember the last 100 members we sent to a client for the purposes of
+# avoiding redundantly sending the same lazy-loaded members to the client
+LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE = 100
+
SyncConfig = collections.namedtuple("SyncConfig", [
"user",
@@ -181,6 +192,12 @@ class SyncHandler(object):
self.response_cache = ResponseCache(hs, "sync")
self.state = hs.get_state_handler()
+ # ExpiringCache((User, Device)) -> LruCache(state_key => event_id)
+ self.lazy_loaded_members_cache = ExpiringCache(
+ "lazy_loaded_members_cache", self.clock,
+ max_len=0, expiry_ms=LAZY_LOADED_MEMBERS_CACHE_MAX_AGE,
+ )
+
def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0,
full_state=False):
"""Get the sync for a client if we have new data for it now. Otherwise
@@ -416,29 +433,44 @@ class SyncHandler(object):
))
@defer.inlineCallbacks
- def get_state_after_event(self, event):
+ def get_state_after_event(self, event, types=None, filtered_types=None):
"""
Get the room state after the given event
Args:
event(synapse.events.EventBase): event of interest
+ types(list[(str, str|None)]|None): List of (type, state_key) tuples
+ which are used to filter the state fetched. If `state_key` is None,
+ all events are returned of the given type.
+ May be None, which matches any key.
+ filtered_types(list[str]|None): Only apply filtering via `types` to this
+ list of event types. Other types of events are returned unfiltered.
+ If None, `types` filtering is applied to all events.
Returns:
A Deferred map from ((type, state_key)->Event)
"""
- state_ids = yield self.store.get_state_ids_for_event(event.event_id)
+ state_ids = yield self.store.get_state_ids_for_event(
+ event.event_id, types, filtered_types=filtered_types,
+ )
if event.is_state():
state_ids = state_ids.copy()
state_ids[(event.type, event.state_key)] = event.event_id
defer.returnValue(state_ids)
@defer.inlineCallbacks
- def get_state_at(self, room_id, stream_position):
+ def get_state_at(self, room_id, stream_position, types=None, filtered_types=None):
""" Get the room state at a particular stream position
Args:
room_id(str): room for which to get state
stream_position(StreamToken): point at which to get state
+ types(list[(str, str|None)]|None): List of (type, state_key) tuples
+ which are used to filter the state fetched. If `state_key` is None,
+ all events are returned of the given type.
+ filtered_types(list[str]|None): Only apply filtering via `types` to this
+ list of event types. Other types of events are returned unfiltered.
+ If None, `types` filtering is applied to all events.
Returns:
A Deferred map from ((type, state_key)->Event)
@@ -453,7 +485,9 @@ class SyncHandler(object):
if last_events:
last_event = last_events[-1]
- state = yield self.get_state_after_event(last_event)
+ state = yield self.get_state_after_event(
+ last_event, types, filtered_types=filtered_types,
+ )
else:
# no events in this room - so presumably no state
@@ -485,59 +519,129 @@ class SyncHandler(object):
# TODO(mjark) Check for new redactions in the state events.
with Measure(self.clock, "compute_state_delta"):
+
+ types = None
+ filtered_types = None
+
+ lazy_load_members = sync_config.filter_collection.lazy_load_members()
+ include_redundant_members = (
+ sync_config.filter_collection.include_redundant_members()
+ )
+
+ if lazy_load_members:
+ # We only request state for the members needed to display the
+ # timeline:
+
+ types = [
+ (EventTypes.Member, state_key)
+ for state_key in set(
+ event.sender # FIXME: we also care about invite targets etc.
+ for event in batch.events
+ )
+ ]
+
+ # only apply the filtering to room members
+ filtered_types = [EventTypes.Member]
+
+ timeline_state = {
+ (event.type, event.state_key): event.event_id
+ for event in batch.events if event.is_state()
+ }
+
if full_state:
if batch:
current_state_ids = yield self.store.get_state_ids_for_event(
- batch.events[-1].event_id
+ batch.events[-1].event_id, types=types,
+ filtered_types=filtered_types,
)
state_ids = yield self.store.get_state_ids_for_event(
- batch.events[0].event_id
+ batch.events[0].event_id, types=types,
+ filtered_types=filtered_types,
)
+
else:
current_state_ids = yield self.get_state_at(
- room_id, stream_position=now_token
+ room_id, stream_position=now_token, types=types,
+ filtered_types=filtered_types,
)
state_ids = current_state_ids
- timeline_state = {
- (event.type, event.state_key): event.event_id
- for event in batch.events if event.is_state()
- }
-
state_ids = _calculate_state(
timeline_contains=timeline_state,
timeline_start=state_ids,
previous={},
current=current_state_ids,
+ lazy_load_members=lazy_load_members,
)
elif batch.limited:
state_at_previous_sync = yield self.get_state_at(
- room_id, stream_position=since_token
+ room_id, stream_position=since_token, types=types,
+ filtered_types=filtered_types,
)
current_state_ids = yield self.store.get_state_ids_for_event(
- batch.events[-1].event_id
+ batch.events[-1].event_id, types=types,
+ filtered_types=filtered_types,
)
state_at_timeline_start = yield self.store.get_state_ids_for_event(
- batch.events[0].event_id
+ batch.events[0].event_id, types=types,
+ filtered_types=filtered_types,
)
- timeline_state = {
- (event.type, event.state_key): event.event_id
- for event in batch.events if event.is_state()
- }
-
state_ids = _calculate_state(
timeline_contains=timeline_state,
timeline_start=state_at_timeline_start,
previous=state_at_previous_sync,
current=current_state_ids,
+ lazy_load_members=lazy_load_members,
)
else:
state_ids = {}
+ if lazy_load_members:
+ if types:
+ state_ids = yield self.store.get_state_ids_for_event(
+ batch.events[0].event_id, types=types,
+ filtered_types=filtered_types,
+ )
+
+ if lazy_load_members and not include_redundant_members:
+ cache_key = (sync_config.user.to_string(), sync_config.device_id)
+ cache = self.lazy_loaded_members_cache.get(cache_key)
+ if cache is None:
+ logger.debug("creating LruCache for %r", cache_key)
+ cache = LruCache(LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE)
+ self.lazy_loaded_members_cache[cache_key] = cache
+ else:
+ logger.debug("found LruCache for %r", cache_key)
+
+ # if it's a new sync sequence, then assume the client has had
+ # amnesia and doesn't want any recent lazy-loaded members
+ # de-duplicated.
+ if since_token is None:
+ logger.debug("clearing LruCache for %r", cache_key)
+ cache.clear()
+ else:
+ # only send members which aren't in our LruCache (either
+ # because they're new to this client or have been pushed out
+ # of the cache)
+ logger.debug("filtering state from %r...", state_ids)
+ state_ids = {
+ t: event_id
+ for t, event_id in state_ids.iteritems()
+ if cache.get(t[1]) != event_id
+ }
+ logger.debug("...to %r", state_ids)
+
+ # add any member IDs we are about to send into our LruCache
+ for t, event_id in itertools.chain(
+ state_ids.items(),
+ timeline_state.items(),
+ ):
+ if t[0] == EventTypes.Member:
+ cache.set(t[1], event_id)
state = {}
if state_ids:
@@ -1448,7 +1552,9 @@ def _action_has_highlight(actions):
return False
-def _calculate_state(timeline_contains, timeline_start, previous, current):
+def _calculate_state(
+ timeline_contains, timeline_start, previous, current, lazy_load_members,
+):
"""Works out what state to include in a sync response.
Args:
@@ -1457,6 +1563,9 @@ def _calculate_state(timeline_contains, timeline_start, previous, current):
previous (dict): state at the end of the previous sync (or empty dict
if this is an initial sync)
current (dict): state at the end of the timeline
+ lazy_load_members (bool): whether to return members from timeline_start
+ or not. assumes that timeline_start has already been filtered to
+ include only the members the client needs to know about.
Returns:
dict
@@ -1472,9 +1581,25 @@ def _calculate_state(timeline_contains, timeline_start, previous, current):
}
c_ids = set(e for e in current.values())
- tc_ids = set(e for e in timeline_contains.values())
- p_ids = set(e for e in previous.values())
ts_ids = set(e for e in timeline_start.values())
+ p_ids = set(e for e in previous.values())
+ tc_ids = set(e for e in timeline_contains.values())
+
+ # If we are lazyloading room members, we explicitly add the membership events
+ # for the senders in the timeline into the state block returned by /sync,
+ # as we may not have sent them to the client before. We find these membership
+ # events by filtering them out of timeline_start, which has already been filtered
+ # to only include membership events for the senders in the timeline.
+ # In practice, we can do this by removing them from the p_ids list,
+ # which is the list of relevant state we know we have already sent to the client.
+ # see https://github.com/matrix-org/synapse/pull/2970
+ # /files/efcdacad7d1b7f52f879179701c7e0d9b763511f#r204732809
+
+ if lazy_load_members:
+ p_ids.difference_update(
+ e for t, e in timeline_start.iteritems()
+ if t[0] == EventTypes.Member
+ )
state_ids = ((c_ids | ts_ids) - p_ids) - tc_ids
diff --git a/synapse/http/client.py b/synapse/http/client.py
index d6a0d75b2b..25b6307884 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -26,9 +26,11 @@ from OpenSSL.SSL import VERIFY_NONE
from twisted.internet import defer, protocol, reactor, ssl, task
from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
from twisted.web._newclient import ResponseDone
-from twisted.web.client import Agent, BrowserLikeRedirectAgent, ContentDecoderAgent
-from twisted.web.client import FileBodyProducer as TwistedFileBodyProducer
from twisted.web.client import (
+ Agent,
+ BrowserLikeRedirectAgent,
+ ContentDecoderAgent,
+ FileBodyProducer as TwistedFileBodyProducer,
GzipDecoder,
HTTPConnectionPool,
PartialDownloadError,
diff --git a/synapse/http/request_metrics.py b/synapse/http/request_metrics.py
index f24b4b949c..588e280571 100644
--- a/synapse/http/request_metrics.py
+++ b/synapse/http/request_metrics.py
@@ -38,7 +38,8 @@ outgoing_responses_counter = Counter(
)
response_timer = Histogram(
- "synapse_http_server_response_time_seconds", "sec", ["method", "servlet", "tag"]
+ "synapse_http_server_response_time_seconds", "sec",
+ ["method", "servlet", "tag", "code"],
)
response_ru_utime = Counter(
@@ -171,11 +172,13 @@ class RequestMetrics(object):
)
return
- outgoing_responses_counter.labels(request.method, str(request.code)).inc()
+ response_code = str(request.code)
+
+ outgoing_responses_counter.labels(request.method, response_code).inc()
response_count.labels(request.method, self.name, tag).inc()
- response_timer.labels(request.method, self.name, tag).observe(
+ response_timer.labels(request.method, self.name, tag, response_code).observe(
time_sec - self.start
)
diff --git a/synapse/http/site.py b/synapse/http/site.py
index 21e26f9c5e..5fd30a4c2c 100644
--- a/synapse/http/site.py
+++ b/synapse/http/site.py
@@ -20,7 +20,7 @@ from twisted.web.server import Request, Site
from synapse.http import redact_uri
from synapse.http.request_metrics import RequestMetrics
-from synapse.util.logcontext import LoggingContext, ContextResourceUsage
+from synapse.util.logcontext import ContextResourceUsage, LoggingContext
logger = logging.getLogger(__name__)
@@ -42,9 +42,10 @@ class SynapseRequest(Request):
which is handling the request, and returns a context manager.
"""
- def __init__(self, site, *args, **kw):
- Request.__init__(self, *args, **kw)
+ def __init__(self, site, channel, *args, **kw):
+ Request.__init__(self, channel, *args, **kw)
self.site = site
+ self._channel = channel
self.authenticated_entity = None
self.start_time = 0
diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py
new file mode 100644
index 0000000000..ce678d5f75
--- /dev/null
+++ b/synapse/metrics/background_process_metrics.py
@@ -0,0 +1,185 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import six
+
+from prometheus_client.core import REGISTRY, Counter, GaugeMetricFamily
+
+from twisted.internet import defer
+
+from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
+
+_background_process_start_count = Counter(
+ "synapse_background_process_start_count",
+ "Number of background processes started",
+ ["name"],
+)
+
+# we set registry=None in all of these to stop them getting registered with
+# the default registry. Instead we collect them all via the CustomCollector,
+# which ensures that we can update them before they are collected.
+#
+_background_process_ru_utime = Counter(
+ "synapse_background_process_ru_utime_seconds",
+ "User CPU time used by background processes, in seconds",
+ ["name"],
+ registry=None,
+)
+
+_background_process_ru_stime = Counter(
+ "synapse_background_process_ru_stime_seconds",
+ "System CPU time used by background processes, in seconds",
+ ["name"],
+ registry=None,
+)
+
+_background_process_db_txn_count = Counter(
+ "synapse_background_process_db_txn_count",
+ "Number of database transactions done by background processes",
+ ["name"],
+ registry=None,
+)
+
+_background_process_db_txn_duration = Counter(
+ "synapse_background_process_db_txn_duration_seconds",
+ ("Seconds spent by background processes waiting for database "
+ "transactions, excluding scheduling time"),
+ ["name"],
+ registry=None,
+)
+
+_background_process_db_sched_duration = Counter(
+ "synapse_background_process_db_sched_duration_seconds",
+ "Seconds spent by background processes waiting for database connections",
+ ["name"],
+ registry=None,
+)
+
+# map from description to a counter, so that we can name our logcontexts
+# incrementally. (It actually duplicates _background_process_start_count, but
+# it's much simpler to do so than to try to combine them.)
+_background_process_counts = dict() # type: dict[str, int]
+
+# map from description to the currently running background processes.
+#
+# it's kept as a dict of sets rather than a big set so that we can keep track
+# of process descriptions that no longer have any active processes.
+_background_processes = dict() # type: dict[str, set[_BackgroundProcess]]
+
+
+class _Collector(object):
+ """A custom metrics collector for the background process metrics.
+
+ Ensures that all of the metrics are up-to-date with any in-flight processes
+ before they are returned.
+ """
+ def collect(self):
+ background_process_in_flight_count = GaugeMetricFamily(
+ "synapse_background_process_in_flight_count",
+ "Number of background processes in flight",
+ labels=["name"],
+ )
+
+ for desc, processes in six.iteritems(_background_processes):
+ background_process_in_flight_count.add_metric(
+ (desc,), len(processes),
+ )
+ for process in processes:
+ process.update_metrics()
+
+ yield background_process_in_flight_count
+
+ # now we need to run collect() over each of the static Counters, and
+ # yield each metric they return.
+ for m in (
+ _background_process_ru_utime,
+ _background_process_ru_stime,
+ _background_process_db_txn_count,
+ _background_process_db_txn_duration,
+ _background_process_db_sched_duration,
+ ):
+ for r in m.collect():
+ yield r
+
+
+REGISTRY.register(_Collector())
+
+
+class _BackgroundProcess(object):
+ def __init__(self, desc, ctx):
+ self.desc = desc
+ self._context = ctx
+ self._reported_stats = None
+
+ def update_metrics(self):
+ """Updates the metrics with values from this process."""
+ new_stats = self._context.get_resource_usage()
+ if self._reported_stats is None:
+ diff = new_stats
+ else:
+ diff = new_stats - self._reported_stats
+ self._reported_stats = new_stats
+
+ _background_process_ru_utime.labels(self.desc).inc(diff.ru_utime)
+ _background_process_ru_stime.labels(self.desc).inc(diff.ru_stime)
+ _background_process_db_txn_count.labels(self.desc).inc(
+ diff.db_txn_count,
+ )
+ _background_process_db_txn_duration.labels(self.desc).inc(
+ diff.db_txn_duration_sec,
+ )
+ _background_process_db_sched_duration.labels(self.desc).inc(
+ diff.db_sched_duration_sec,
+ )
+
+
+def run_as_background_process(desc, func, *args, **kwargs):
+ """Run the given function in its own logcontext, with resource metrics
+
+ This should be used to wrap processes which are fired off to run in the
+ background, instead of being associated with a particular request.
+
+ It returns a Deferred which completes when the function completes, but it doesn't
+ follow the synapse logcontext rules, which makes it appropriate for passing to
+ clock.looping_call and friends (or for firing-and-forgetting in the middle of a
+ normal synapse inlineCallbacks function).
+
+ Args:
+ desc (str): a description for this background process type
+ func: a function, which may return a Deferred
+ args: positional args for func
+ kwargs: keyword args for func
+
+ Returns: Deferred which returns the result of func, but note that it does not
+ follow the synapse logcontext rules.
+ """
+ @defer.inlineCallbacks
+ def run():
+ count = _background_process_counts.get(desc, 0)
+ _background_process_counts[desc] = count + 1
+ _background_process_start_count.labels(desc).inc()
+
+ with LoggingContext(desc) as context:
+ context.request = "%s-%i" % (desc, count)
+ proc = _BackgroundProcess(desc, context)
+ _background_processes.setdefault(desc, set()).add(proc)
+ try:
+ yield func(*args, **kwargs)
+ finally:
+ proc.update_metrics()
+ _background_processes[desc].remove(proc)
+
+ with PreserveLoggingContext():
+ return run()
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 51cbd66f06..e650c3e494 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -274,7 +274,7 @@ class Notifier(object):
logger.exception("Error notifying application services of event")
def on_new_event(self, stream_key, new_token, users=[], rooms=[]):
- """ Used to inform listeners that something has happend event wise.
+ """ Used to inform listeners that something has happened event wise.
Will wake up all listeners for the given users and rooms.
"""
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index bb181d94ee..1d14d3639c 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -112,7 +112,8 @@ class BulkPushRuleEvaluator(object):
@defer.inlineCallbacks
def _get_power_levels_and_sender_level(self, event, context):
- pl_event_id = context.prev_state_ids.get(POWER_KEY)
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
+ pl_event_id = prev_state_ids.get(POWER_KEY)
if pl_event_id:
# fastpath: if there's a power level event, that's all we need, and
# not having a power level event is an extreme edge case
@@ -120,7 +121,7 @@ class BulkPushRuleEvaluator(object):
auth_events = {POWER_KEY: pl_event}
else:
auth_events_ids = yield self.auth.compute_auth_events(
- event, context.prev_state_ids, for_verification=False,
+ event, prev_state_ids, for_verification=False,
)
auth_events = yield self.store.get_events(auth_events_ids)
auth_events = {
@@ -304,7 +305,7 @@ class RulesForRoom(object):
push_rules_delta_state_cache_metric.inc_hits()
else:
- current_state_ids = context.current_state_ids
+ current_state_ids = yield context.get_current_state_ids(self.store)
push_rules_delta_state_cache_metric.inc_misses()
push_rules_state_size_counter.inc(len(current_state_ids))
diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py
index 2eede54792..5227bc333d 100644
--- a/synapse/replication/http/send_event.py
+++ b/synapse/replication/http/send_event.py
@@ -34,12 +34,13 @@ logger = logging.getLogger(__name__)
@defer.inlineCallbacks
-def send_event_to_master(clock, client, host, port, requester, event, context,
+def send_event_to_master(clock, store, client, host, port, requester, event, context,
ratelimit, extra_users):
"""Send event to be handled on the master
Args:
clock (synapse.util.Clock)
+ store (DataStore)
client (SimpleHttpClient)
host (str): host of master
port (int): port on master listening for HTTP replication
@@ -53,11 +54,13 @@ def send_event_to_master(clock, client, host, port, requester, event, context,
host, port, event.event_id,
)
+ serialized_context = yield context.serialize(event, store)
+
payload = {
"event": event.get_pdu_json(),
"internal_metadata": event.internal_metadata.get_dict(),
"rejected_reason": event.rejected_reason,
- "context": context.serialize(event),
+ "context": serialized_context,
"requester": requester.serialize(),
"ratelimit": ratelimit,
"extra_users": [u.to_string() for u in extra_users],
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index e592ab57bf..970e94313e 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -192,7 +192,7 @@ class ReplicationClientHandler(object):
"""Returns a deferred that is resolved when we receive a SYNC command
with given data.
- Used by tests.
+ [Not currently] used by tests.
"""
return self.awaiting_syncs.setdefault(data, defer.Deferred())
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 611fb66e1d..fd59f1595f 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -25,6 +25,7 @@ from twisted.internet import defer
from twisted.internet.protocol import Factory
from synapse.metrics import LaterGauge
+from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.metrics import Measure, measure_func
from .protocol import ServerReplicationStreamProtocol
@@ -117,7 +118,6 @@ class ReplicationStreamer(object):
for conn in self.connections:
conn.send_error("server shutting down")
- @defer.inlineCallbacks
def on_notifier_poke(self):
"""Checks if there is actually any new data and sends it to the
connections if there are.
@@ -132,14 +132,16 @@ class ReplicationStreamer(object):
stream.discard_updates_and_advance()
return
- # If we're in the process of checking for new updates, mark that fact
- # and return
+ self.pending_updates = True
+
if self.is_looping:
- logger.debug("Noitifier poke loop already running")
- self.pending_updates = True
+ logger.debug("Notifier poke loop already running")
return
- self.pending_updates = True
+ run_as_background_process("replication_notifier", self._run_notifier_loop)
+
+ @defer.inlineCallbacks
+ def _run_notifier_loop(self):
self.is_looping = True
try:
diff --git a/synapse/rest/__init__.py b/synapse/rest/__init__.py
index 75c2a4ec8e..3418f06fd6 100644
--- a/synapse/rest/__init__.py
+++ b/synapse/rest/__init__.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -13,13 +14,24 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from six import PY3
+
from synapse.http.server import JsonResource
from synapse.rest.client import versions
-from synapse.rest.client.v1 import admin, directory, events, initial_sync
-from synapse.rest.client.v1 import login as v1_login
-from synapse.rest.client.v1 import logout, presence, profile, push_rule, pusher
-from synapse.rest.client.v1 import register as v1_register
-from synapse.rest.client.v1 import room, voip
+from synapse.rest.client.v1 import (
+ admin,
+ directory,
+ events,
+ initial_sync,
+ login as v1_login,
+ logout,
+ presence,
+ profile,
+ push_rule,
+ pusher,
+ room,
+ voip,
+)
from synapse.rest.client.v2_alpha import (
account,
account_data,
@@ -42,6 +54,11 @@ from synapse.rest.client.v2_alpha import (
user_directory,
)
+if not PY3:
+ from synapse.rest.client.v1_only import (
+ register as v1_register,
+ )
+
class ClientRestResource(JsonResource):
"""A resource for version 1 of the matrix client API."""
@@ -54,14 +71,22 @@ class ClientRestResource(JsonResource):
def register_servlets(client_resource, hs):
versions.register_servlets(client_resource)
- # "v1"
- room.register_servlets(hs, client_resource)
+ if not PY3:
+ # "v1" (Python 2 only)
+ v1_register.register_servlets(hs, client_resource)
+
+ # Deprecated in r0
+ initial_sync.register_servlets(hs, client_resource)
+ room.register_deprecated_servlets(hs, client_resource)
+
+ # Partially deprecated in r0
events.register_servlets(hs, client_resource)
- v1_register.register_servlets(hs, client_resource)
+
+ # "v1" + "r0"
+ room.register_servlets(hs, client_resource)
v1_login.register_servlets(hs, client_resource)
profile.register_servlets(hs, client_resource)
presence.register_servlets(hs, client_resource)
- initial_sync.register_servlets(hs, client_resource)
directory.register_servlets(hs, client_resource)
voip.register_servlets(hs, client_resource)
admin.register_servlets(hs, client_resource)
diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py
index 01c3f2eb04..99f6c6e3c3 100644
--- a/synapse/rest/client/v1/admin.py
+++ b/synapse/rest/client/v1/admin.py
@@ -14,6 +14,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import hashlib
+import hmac
import logging
from six.moves import http_client
@@ -24,9 +26,9 @@ from synapse.api.constants import Membership
from synapse.api.errors import AuthError, Codes, NotFoundError, SynapseError
from synapse.http.servlet import (
assert_params_in_dict,
- parse_json_object_from_request,
parse_integer,
- parse_string
+ parse_json_object_from_request,
+ parse_string,
)
from synapse.types import UserID, create_requester
@@ -63,6 +65,125 @@ class UsersRestServlet(ClientV1RestServlet):
defer.returnValue((200, ret))
+class UserRegisterServlet(ClientV1RestServlet):
+ """
+ Attributes:
+ NONCE_TIMEOUT (int): Seconds until a generated nonce won't be accepted
+ nonces (dict[str, int]): The nonces that we will accept. A dict of
+ nonce to the time it was generated, in int seconds.
+ """
+ PATTERNS = client_path_patterns("/admin/register")
+ NONCE_TIMEOUT = 60
+
+ def __init__(self, hs):
+ super(UserRegisterServlet, self).__init__(hs)
+ self.handlers = hs.get_handlers()
+ self.reactor = hs.get_reactor()
+ self.nonces = {}
+ self.hs = hs
+
+ def _clear_old_nonces(self):
+ """
+ Clear out old nonces that are older than NONCE_TIMEOUT.
+ """
+ now = int(self.reactor.seconds())
+
+ for k, v in list(self.nonces.items()):
+ if now - v > self.NONCE_TIMEOUT:
+ del self.nonces[k]
+
+ def on_GET(self, request):
+ """
+ Generate a new nonce.
+ """
+ self._clear_old_nonces()
+
+ nonce = self.hs.get_secrets().token_hex(64)
+ self.nonces[nonce] = int(self.reactor.seconds())
+ return (200, {"nonce": nonce.encode('ascii')})
+
+ @defer.inlineCallbacks
+ def on_POST(self, request):
+ self._clear_old_nonces()
+
+ if not self.hs.config.registration_shared_secret:
+ raise SynapseError(400, "Shared secret registration is not enabled")
+
+ body = parse_json_object_from_request(request)
+
+ if "nonce" not in body:
+ raise SynapseError(
+ 400, "nonce must be specified", errcode=Codes.BAD_JSON,
+ )
+
+ nonce = body["nonce"]
+
+ if nonce not in self.nonces:
+ raise SynapseError(
+ 400, "unrecognised nonce",
+ )
+
+ # Delete the nonce, so it can't be reused, even if it's invalid
+ del self.nonces[nonce]
+
+ if "username" not in body:
+ raise SynapseError(
+ 400, "username must be specified", errcode=Codes.BAD_JSON,
+ )
+ else:
+ if (not isinstance(body['username'], str) or len(body['username']) > 512):
+ raise SynapseError(400, "Invalid username")
+
+ username = body["username"].encode("utf-8")
+ if b"\x00" in username:
+ raise SynapseError(400, "Invalid username")
+
+ if "password" not in body:
+ raise SynapseError(
+ 400, "password must be specified", errcode=Codes.BAD_JSON,
+ )
+ else:
+ if (not isinstance(body['password'], str) or len(body['password']) > 512):
+ raise SynapseError(400, "Invalid password")
+
+ password = body["password"].encode("utf-8")
+ if b"\x00" in password:
+ raise SynapseError(400, "Invalid password")
+
+ admin = body.get("admin", None)
+ got_mac = body["mac"]
+
+ want_mac = hmac.new(
+ key=self.hs.config.registration_shared_secret.encode(),
+ digestmod=hashlib.sha1,
+ )
+ want_mac.update(nonce)
+ want_mac.update(b"\x00")
+ want_mac.update(username)
+ want_mac.update(b"\x00")
+ want_mac.update(password)
+ want_mac.update(b"\x00")
+ want_mac.update(b"admin" if admin else b"notadmin")
+ want_mac = want_mac.hexdigest()
+
+ if not hmac.compare_digest(want_mac, got_mac):
+ raise SynapseError(
+ 403, "HMAC incorrect",
+ )
+
+ # Reuse the parts of RegisterRestServlet to reduce code duplication
+ from synapse.rest.client.v2_alpha.register import RegisterRestServlet
+ register = RegisterRestServlet(self.hs)
+
+ (user_id, _) = yield register.registration_handler.register(
+ localpart=username.lower(), password=password, admin=bool(admin),
+ generate_token=False,
+ )
+
+ result = yield register._create_registration_details(user_id, body)
+ defer.returnValue((200, result))
+
+
class WhoisRestServlet(ClientV1RestServlet):
PATTERNS = client_path_patterns("/admin/whois/(?P<user_id>[^/]*)")
@@ -123,7 +244,7 @@ class PurgeHistoryRestServlet(ClientV1RestServlet):
hs (synapse.server.HomeServer)
"""
super(PurgeHistoryRestServlet, self).__init__(hs)
- self.handlers = hs.get_handlers()
+ self.pagination_handler = hs.get_pagination_handler()
self.store = hs.get_datastore()
@defer.inlineCallbacks
@@ -198,7 +319,7 @@ class PurgeHistoryRestServlet(ClientV1RestServlet):
errcode=Codes.BAD_JSON,
)
- purge_id = yield self.handlers.message_handler.start_purge_history(
+ purge_id = yield self.pagination_handler.start_purge_history(
room_id, token,
delete_local_events=delete_local_events,
)
@@ -220,7 +341,7 @@ class PurgeHistoryStatusRestServlet(ClientV1RestServlet):
hs (synapse.server.HomeServer)
"""
super(PurgeHistoryStatusRestServlet, self).__init__(hs)
- self.handlers = hs.get_handlers()
+ self.pagination_handler = hs.get_pagination_handler()
@defer.inlineCallbacks
def on_GET(self, request, purge_id):
@@ -230,7 +351,7 @@ class PurgeHistoryStatusRestServlet(ClientV1RestServlet):
if not is_admin:
raise AuthError(403, "You are not a server admin")
- purge_status = self.handlers.message_handler.get_purge_status(purge_id)
+ purge_status = self.pagination_handler.get_purge_status(purge_id)
if purge_status is None:
raise NotFoundError("purge id '%s' not found" % purge_id)
@@ -614,3 +735,4 @@ def register_servlets(hs, http_server):
ShutdownRoomRestServlet(hs).register(http_server)
QuarantineMediaInRoom(hs).register(http_server)
ListMediaInRoom(hs).register(http_server)
+ UserRegisterServlet(hs).register(http_server)
diff --git a/synapse/rest/client/v1/initial_sync.py b/synapse/rest/client/v1/initial_sync.py
index 00a1a99feb..fd5f85b53e 100644
--- a/synapse/rest/client/v1/initial_sync.py
+++ b/synapse/rest/client/v1/initial_sync.py
@@ -15,8 +15,8 @@
from twisted.internet import defer
-from synapse.streams.config import PaginationConfig
from synapse.http.servlet import parse_boolean
+from synapse.streams.config import PaginationConfig
from .base import ClientV1RestServlet, client_path_patterns
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index 3d62447854..b7bd878c90 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -90,6 +90,7 @@ class RoomStateEventRestServlet(ClientV1RestServlet):
self.handlers = hs.get_handlers()
self.event_creation_hander = hs.get_event_creation_handler()
self.room_member_handler = hs.get_room_member_handler()
+ self.message_handler = hs.get_message_handler()
def register(self, http_server):
# /room/$roomid/state/$eventtype
@@ -124,7 +125,7 @@ class RoomStateEventRestServlet(ClientV1RestServlet):
format = parse_string(request, "format", default="content",
allowed_values=["content", "event"])
- msg_handler = self.handlers.message_handler
+ msg_handler = self.message_handler
data = yield msg_handler.get_room_data(
user_id=requester.user.to_string(),
room_id=room_id,
@@ -377,14 +378,13 @@ class RoomMemberListRestServlet(ClientV1RestServlet):
def __init__(self, hs):
super(RoomMemberListRestServlet, self).__init__(hs)
- self.handlers = hs.get_handlers()
+ self.message_handler = hs.get_message_handler()
@defer.inlineCallbacks
def on_GET(self, request, room_id):
# TODO support Pagination stream API (limit/tokens)
requester = yield self.auth.get_user_by_req(request)
- handler = self.handlers.message_handler
- events = yield handler.get_state_events(
+ events = yield self.message_handler.get_state_events(
room_id=room_id,
user_id=requester.user.to_string(),
)
@@ -406,7 +406,7 @@ class JoinedRoomMemberListRestServlet(ClientV1RestServlet):
def __init__(self, hs):
super(JoinedRoomMemberListRestServlet, self).__init__(hs)
- self.message_handler = hs.get_handlers().message_handler
+ self.message_handler = hs.get_message_handler()
@defer.inlineCallbacks
def on_GET(self, request, room_id):
@@ -427,7 +427,7 @@ class RoomMessageListRestServlet(ClientV1RestServlet):
def __init__(self, hs):
super(RoomMessageListRestServlet, self).__init__(hs)
- self.handlers = hs.get_handlers()
+ self.pagination_handler = hs.get_pagination_handler()
@defer.inlineCallbacks
def on_GET(self, request, room_id):
@@ -442,8 +442,7 @@ class RoomMessageListRestServlet(ClientV1RestServlet):
event_filter = Filter(json.loads(filter_json))
else:
event_filter = None
- handler = self.handlers.message_handler
- msgs = yield handler.get_messages(
+ msgs = yield self.pagination_handler.get_messages(
room_id=room_id,
requester=requester,
pagin_config=pagination_config,
@@ -460,14 +459,13 @@ class RoomStateRestServlet(ClientV1RestServlet):
def __init__(self, hs):
super(RoomStateRestServlet, self).__init__(hs)
- self.handlers = hs.get_handlers()
+ self.message_handler = hs.get_message_handler()
@defer.inlineCallbacks
def on_GET(self, request, room_id):
requester = yield self.auth.get_user_by_req(request, allow_guest=True)
- handler = self.handlers.message_handler
# Get all the current state for this room
- events = yield handler.get_state_events(
+ events = yield self.message_handler.get_state_events(
room_id=room_id,
user_id=requester.user.to_string(),
is_guest=requester.is_guest,
@@ -525,7 +523,7 @@ class RoomEventContextServlet(ClientV1RestServlet):
def __init__(self, hs):
super(RoomEventContextServlet, self).__init__(hs)
self.clock = hs.get_clock()
- self.handlers = hs.get_handlers()
+ self.room_context_handler = hs.get_room_context_handler()
@defer.inlineCallbacks
def on_GET(self, request, room_id, event_id):
@@ -533,7 +531,7 @@ class RoomEventContextServlet(ClientV1RestServlet):
limit = parse_integer(request, "limit", default=10)
- results = yield self.handlers.room_context_handler.get_event_context(
+ results = yield self.room_context_handler.get_event_context(
requester.user,
room_id,
event_id,
@@ -832,10 +830,13 @@ def register_servlets(hs, http_server):
RoomSendEventRestServlet(hs).register(http_server)
PublicRoomListRestServlet(hs).register(http_server)
RoomStateRestServlet(hs).register(http_server)
- RoomInitialSyncRestServlet(hs).register(http_server)
RoomRedactEventRestServlet(hs).register(http_server)
RoomTypingRestServlet(hs).register(http_server)
SearchRestServlet(hs).register(http_server)
JoinedRoomsRestServlet(hs).register(http_server)
RoomEventServlet(hs).register(http_server)
RoomEventContextServlet(hs).register(http_server)
+
+
+def register_deprecated_servlets(hs, http_server):
+ RoomInitialSyncRestServlet(hs).register(http_server)
diff --git a/synapse/rest/client/v1_only/__init__.py b/synapse/rest/client/v1_only/__init__.py
new file mode 100644
index 0000000000..936f902ace
--- /dev/null
+++ b/synapse/rest/client/v1_only/__init__.py
@@ -0,0 +1,3 @@
+"""
+REST APIs that are only used in v1 (the legacy API).
+"""
diff --git a/synapse/rest/client/v1_only/base.py b/synapse/rest/client/v1_only/base.py
new file mode 100644
index 0000000000..9d4db7437c
--- /dev/null
+++ b/synapse/rest/client/v1_only/base.py
@@ -0,0 +1,39 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""This module contains base REST classes for constructing client v1 servlets.
+"""
+
+import re
+
+from synapse.api.urls import CLIENT_PREFIX
+
+
+def v1_only_client_path_patterns(path_regex, include_in_unstable=True):
+ """Creates a regex compiled client path with the correct client path
+ prefix.
+
+ Args:
+ path_regex (str): The regex string to match. This should NOT have a ^
+ as this will be prefixed.
+ Returns:
+ list of SRE_Pattern
+ """
+ patterns = [re.compile("^" + CLIENT_PREFIX + path_regex)]
+ if include_in_unstable:
+ unstable_prefix = CLIENT_PREFIX.replace("/api/v1", "/unstable")
+ patterns.append(re.compile("^" + unstable_prefix + path_regex))
+ return patterns
diff --git a/synapse/rest/client/v1/register.py b/synapse/rest/client/v1_only/register.py
index 25a143af8d..3439c3c6d4 100644
--- a/synapse/rest/client/v1/register.py
+++ b/synapse/rest/client/v1_only/register.py
@@ -24,9 +24,10 @@ import synapse.util.stringutils as stringutils
from synapse.api.constants import LoginType
from synapse.api.errors import Codes, SynapseError
from synapse.http.servlet import assert_params_in_dict, parse_json_object_from_request
+from synapse.rest.client.v1.base import ClientV1RestServlet
from synapse.types import create_requester
-from .base import ClientV1RestServlet, client_path_patterns
+from .base import v1_only_client_path_patterns
logger = logging.getLogger(__name__)
@@ -49,7 +50,7 @@ class RegisterRestServlet(ClientV1RestServlet):
handler doesn't have a concept of multi-stages or sessions.
"""
- PATTERNS = client_path_patterns("/register$", releases=(), include_in_unstable=False)
+ PATTERNS = v1_only_client_path_patterns("/register$", include_in_unstable=False)
def __init__(self, hs):
"""
@@ -379,7 +380,7 @@ class CreateUserRestServlet(ClientV1RestServlet):
"""Handles user creation via a server-to-server interface
"""
- PATTERNS = client_path_patterns("/createUser$", releases=())
+ PATTERNS = v1_only_client_path_patterns("/createUser$")
def __init__(self, hs):
super(CreateUserRestServlet, self).__init__(hs)
diff --git a/synapse/rest/client/v2_alpha/devices.py b/synapse/rest/client/v2_alpha/devices.py
index aded2409be..9b75bb1377 100644
--- a/synapse/rest/client/v2_alpha/devices.py
+++ b/synapse/rest/client/v2_alpha/devices.py
@@ -19,9 +19,9 @@ from twisted.internet import defer
from synapse.api import errors
from synapse.http.servlet import (
+ RestServlet,
assert_params_in_dict,
parse_json_object_from_request,
- RestServlet
)
from ._base import client_v2_patterns, interactive_auth_handler
diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py
index 1918897f68..d6cf915d86 100644
--- a/synapse/rest/client/v2_alpha/register.py
+++ b/synapse/rest/client/v2_alpha/register.py
@@ -643,7 +643,7 @@ class RegisterRestServlet(RestServlet):
@defer.inlineCallbacks
def _do_guest_registration(self, params):
if not self.hs.config.allow_guest_access:
- defer.returnValue((403, "Guest access is disabled"))
+ raise SynapseError(403, "Guest access is disabled")
user_id, _ = yield self.registration_handler.register(
generate_token=False,
make_guest=True
diff --git a/synapse/rest/media/v1/identicon_resource.py b/synapse/rest/media/v1/identicon_resource.py
index b3217eff53..bdbd8d50dd 100644
--- a/synapse/rest/media/v1/identicon_resource.py
+++ b/synapse/rest/media/v1/identicon_resource.py
@@ -14,10 +14,10 @@
from pydenticon import Generator
-from synapse.http.servlet import parse_integer
-
from twisted.web.resource import Resource
+from synapse.http.servlet import parse_integer
+
FOREGROUND = [
"rgb(45,79,255)",
"rgb(254,180,44)",
diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py
index 30242c525a..174ad20123 100644
--- a/synapse/rest/media/v1/media_repository.py
+++ b/synapse/rest/media/v1/media_repository.py
@@ -35,6 +35,7 @@ from synapse.api.errors import (
SynapseError,
)
from synapse.http.matrixfederationclient import MatrixFederationHttpClient
+from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.async import Linearizer
from synapse.util.logcontext import make_deferred_yieldable
from synapse.util.retryutils import NotRetryingDestination
@@ -100,10 +101,15 @@ class MediaRepository(object):
)
self.clock.looping_call(
- self._update_recently_accessed,
+ self._start_update_recently_accessed,
UPDATE_RECENTLY_ACCESSED_TS,
)
+ def _start_update_recently_accessed(self):
+ return run_as_background_process(
+ "update_recently_accessed_media", self._update_recently_accessed,
+ )
+
@defer.inlineCallbacks
def _update_recently_accessed(self):
remote_media = self.recently_accessed_remotes
diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py
index b70b15c4c2..27aa0def2f 100644
--- a/synapse/rest/media/v1/preview_url_resource.py
+++ b/synapse/rest/media/v1/preview_url_resource.py
@@ -41,6 +41,7 @@ from synapse.http.server import (
wrap_json_request_handler,
)
from synapse.http.servlet import parse_integer, parse_string
+from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.async import ObservableDeferred
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
@@ -81,7 +82,7 @@ class PreviewUrlResource(Resource):
self._cache.start()
self._cleaner_loop = self.clock.looping_call(
- self._expire_url_cache_data, 10 * 1000
+ self._start_expire_url_cache_data, 10 * 1000,
)
def render_OPTIONS(self, request):
@@ -371,6 +372,11 @@ class PreviewUrlResource(Resource):
"etag": headers["ETag"][0] if "ETag" in headers else None,
})
+ def _start_expire_url_cache_data(self):
+ return run_as_background_process(
+ "expire_url_cache_data", self._expire_url_cache_data,
+ )
+
@defer.inlineCallbacks
def _expire_url_cache_data(self):
"""Clean up expired url cache content, media and thumbnails.
diff --git a/synapse/secrets.py b/synapse/secrets.py
new file mode 100644
index 0000000000..f397daaa5e
--- /dev/null
+++ b/synapse/secrets.py
@@ -0,0 +1,42 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Injectable secrets module for Synapse.
+
+See https://docs.python.org/3/library/secrets.html#module-secrets for the API
+used in Python 3.6, and the API emulated in Python 2.7.
+"""
+
+import six
+
+if six.PY3:
+ import secrets
+
+ def Secrets():
+ return secrets
+
+
+else:
+
+ import os
+ import binascii
+
+ class Secrets(object):
+ def token_bytes(self, nbytes=32):
+ return os.urandom(nbytes)
+
+ def token_hex(self, nbytes=32):
+ return binascii.hexlify(self.token_bytes(nbytes))
diff --git a/synapse/server.py b/synapse/server.py
index 92bea96c5c..140be9ebe8 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -52,12 +52,13 @@ from synapse.handlers.e2e_keys import E2eKeysHandler
from synapse.handlers.events import EventHandler, EventStreamHandler
from synapse.handlers.groups_local import GroupsLocalHandler
from synapse.handlers.initial_sync import InitialSyncHandler
-from synapse.handlers.message import EventCreationHandler
+from synapse.handlers.message import EventCreationHandler, MessageHandler
+from synapse.handlers.pagination import PaginationHandler
from synapse.handlers.presence import PresenceHandler
from synapse.handlers.profile import ProfileHandler
from synapse.handlers.read_marker import ReadMarkerHandler
from synapse.handlers.receipts import ReceiptsHandler
-from synapse.handlers.room import RoomCreationHandler
+from synapse.handlers.room import RoomContextHandler, RoomCreationHandler
from synapse.handlers.room_list import RoomListHandler
from synapse.handlers.room_member import RoomMemberMasterHandler
from synapse.handlers.room_member_worker import RoomMemberWorkerHandler
@@ -74,6 +75,7 @@ from synapse.rest.media.v1.media_repository import (
MediaRepository,
MediaRepositoryResource,
)
+from synapse.secrets import Secrets
from synapse.server_notices.server_notices_manager import ServerNoticesManager
from synapse.server_notices.server_notices_sender import ServerNoticesSender
from synapse.server_notices.worker_server_notices_sender import WorkerServerNoticesSender
@@ -158,11 +160,15 @@ class HomeServer(object):
'groups_server_handler',
'groups_attestation_signing',
'groups_attestation_renewer',
+ 'secrets',
'spam_checker',
'room_member_handler',
'federation_registry',
'server_notices_manager',
'server_notices_sender',
+ 'message_handler',
+ 'pagination_handler',
+ 'room_context_handler',
]
def __init__(self, hostname, reactor=None, **kwargs):
@@ -405,6 +411,9 @@ class HomeServer(object):
def build_groups_attestation_renewer(self):
return GroupAttestionRenewer(self)
+ def build_secrets(self):
+ return Secrets()
+
def build_spam_checker(self):
return SpamChecker(self)
@@ -426,6 +435,15 @@ class HomeServer(object):
return WorkerServerNoticesSender(self)
return ServerNoticesSender(self)
+ def build_message_handler(self):
+ return MessageHandler(self)
+
+ def build_pagination_handler(self):
+ return PaginationHandler(self)
+
+ def build_room_context_handler(self):
+ return RoomContextHandler(self)
+
def remove_pusher(self, app_id, push_key, user_id):
return self.get_pusherpool().remove_pusher(app_id, push_key, user_id)
diff --git a/synapse/state.py b/synapse/state.py
index 15a593d41c..033f55d967 100644
--- a/synapse/state.py
+++ b/synapse/state.py
@@ -18,7 +18,7 @@ import hashlib
import logging
from collections import namedtuple
-from six import iteritems, itervalues
+from six import iteritems, iterkeys, itervalues
from frozendict import frozendict
@@ -203,25 +203,27 @@ class StateHandler(object):
# If this is an outlier, then we know it shouldn't have any current
# state. Certainly store.get_current_state won't return any, and
# persisting the event won't store the state group.
- context = EventContext()
if old_state:
- context.prev_state_ids = {
+ prev_state_ids = {
(s.type, s.state_key): s.event_id for s in old_state
}
if event.is_state():
- context.current_state_ids = dict(context.prev_state_ids)
+ current_state_ids = dict(prev_state_ids)
key = (event.type, event.state_key)
- context.current_state_ids[key] = event.event_id
+ current_state_ids[key] = event.event_id
else:
- context.current_state_ids = context.prev_state_ids
+ current_state_ids = prev_state_ids
else:
- context.current_state_ids = {}
- context.prev_state_ids = {}
- context.prev_state_events = []
+ current_state_ids = {}
+ prev_state_ids = {}
# We don't store state for outliers, so we don't generate a state
- # froup for it.
- context.state_group = None
+ # group for it.
+ context = EventContext.with_state(
+ state_group=None,
+ current_state_ids=current_state_ids,
+ prev_state_ids=prev_state_ids,
+ )
defer.returnValue(context)
@@ -230,31 +232,35 @@ class StateHandler(object):
# Let's just correctly fill out the context and create a
# new state group for it.
- context = EventContext()
- context.prev_state_ids = {
+ prev_state_ids = {
(s.type, s.state_key): s.event_id for s in old_state
}
if event.is_state():
key = (event.type, event.state_key)
- if key in context.prev_state_ids:
- replaces = context.prev_state_ids[key]
+ if key in prev_state_ids:
+ replaces = prev_state_ids[key]
if replaces != event.event_id: # Paranoia check
event.unsigned["replaces_state"] = replaces
- context.current_state_ids = dict(context.prev_state_ids)
- context.current_state_ids[key] = event.event_id
+ current_state_ids = dict(prev_state_ids)
+ current_state_ids[key] = event.event_id
else:
- context.current_state_ids = context.prev_state_ids
+ current_state_ids = prev_state_ids
- context.state_group = yield self.store.store_state_group(
+ state_group = yield self.store.store_state_group(
event.event_id,
event.room_id,
prev_group=None,
delta_ids=None,
- current_state_ids=context.current_state_ids,
+ current_state_ids=current_state_ids,
+ )
+
+ context = EventContext.with_state(
+ state_group=state_group,
+ current_state_ids=current_state_ids,
+ prev_state_ids=prev_state_ids,
)
- context.prev_state_events = []
defer.returnValue(context)
logger.debug("calling resolve_state_groups from compute_event_context")
@@ -262,47 +268,47 @@ class StateHandler(object):
event.room_id, [e for e, _ in event.prev_events],
)
- curr_state = entry.state
+ prev_state_ids = entry.state
+ prev_group = None
+ delta_ids = None
- context = EventContext()
- context.prev_state_ids = curr_state
if event.is_state():
# If this is a state event then we need to create a new state
# group for the state after this event.
key = (event.type, event.state_key)
- if key in context.prev_state_ids:
- replaces = context.prev_state_ids[key]
+ if key in prev_state_ids:
+ replaces = prev_state_ids[key]
event.unsigned["replaces_state"] = replaces
- context.current_state_ids = dict(context.prev_state_ids)
- context.current_state_ids[key] = event.event_id
+ current_state_ids = dict(prev_state_ids)
+ current_state_ids[key] = event.event_id
if entry.state_group:
# If the state at the event has a state group assigned then
# we can use that as the prev group
- context.prev_group = entry.state_group
- context.delta_ids = {
+ prev_group = entry.state_group
+ delta_ids = {
key: event.event_id
}
elif entry.prev_group:
# If the state at the event only has a prev group, then we can
# use that as a prev group too.
- context.prev_group = entry.prev_group
- context.delta_ids = dict(entry.delta_ids)
- context.delta_ids[key] = event.event_id
+ prev_group = entry.prev_group
+ delta_ids = dict(entry.delta_ids)
+ delta_ids[key] = event.event_id
- context.state_group = yield self.store.store_state_group(
+ state_group = yield self.store.store_state_group(
event.event_id,
event.room_id,
- prev_group=context.prev_group,
- delta_ids=context.delta_ids,
- current_state_ids=context.current_state_ids,
+ prev_group=prev_group,
+ delta_ids=delta_ids,
+ current_state_ids=current_state_ids,
)
else:
- context.current_state_ids = context.prev_state_ids
- context.prev_group = entry.prev_group
- context.delta_ids = entry.delta_ids
+ current_state_ids = prev_state_ids
+ prev_group = entry.prev_group
+ delta_ids = entry.delta_ids
if entry.state_group is None:
entry.state_group = yield self.store.store_state_group(
@@ -310,13 +316,20 @@ class StateHandler(object):
event.room_id,
prev_group=entry.prev_group,
delta_ids=entry.delta_ids,
- current_state_ids=context.current_state_ids,
+ current_state_ids=current_state_ids,
)
entry.state_id = entry.state_group
- context.state_group = entry.state_group
+ state_group = entry.state_group
+
+ context = EventContext.with_state(
+ state_group=state_group,
+ current_state_ids=current_state_ids,
+ prev_state_ids=prev_state_ids,
+ prev_group=prev_group,
+ delta_ids=delta_ids,
+ )
- context.prev_state_events = []
defer.returnValue(context)
@defer.inlineCallbacks
@@ -458,69 +471,39 @@ class StateResolutionHandler(object):
"Resolving state for %s with %d groups", room_id, len(state_groups_ids)
)
- # build a map from state key to the event_ids which set that state.
- # dict[(str, str), set[str])
- state = {}
+ # start by assuming we won't have any conflicted state, and build up the new
+ # state map by iterating through the state groups. If we discover a conflict,
+ # we give up and instead use `resolve_events_with_factory`.
+ #
+ # XXX: is this actually worthwhile, or should we just let
+ # resolve_events_with_factory do it?
+ new_state = {}
+ conflicted_state = False
for st in itervalues(state_groups_ids):
for key, e_id in iteritems(st):
- state.setdefault(key, set()).add(e_id)
-
- # build a map from state key to the event_ids which set that state,
- # including only those where there are state keys in conflict.
- conflicted_state = {
- k: list(v)
- for k, v in iteritems(state)
- if len(v) > 1
- }
+ if key in new_state:
+ conflicted_state = True
+ break
+ new_state[key] = e_id
+ if conflicted_state:
+ break
if conflicted_state:
logger.info("Resolving conflicted state for %r", room_id)
with Measure(self.clock, "state._resolve_events"):
new_state = yield resolve_events_with_factory(
- list(state_groups_ids.values()),
+ list(itervalues(state_groups_ids)),
event_map=event_map,
state_map_factory=state_map_factory,
)
- else:
- new_state = {
- key: e_ids.pop() for key, e_ids in iteritems(state)
- }
- with Measure(self.clock, "state.create_group_ids"):
- # if the new state matches any of the input state groups, we can
- # use that state group again. Otherwise we will generate a state_id
- # which will be used as a cache key for future resolutions, but
- # not get persisted.
- state_group = None
- new_state_event_ids = frozenset(itervalues(new_state))
- for sg, events in iteritems(state_groups_ids):
- if new_state_event_ids == frozenset(e_id for e_id in events):
- state_group = sg
- break
+ # if the new state matches any of the input state groups, we can
+ # use that state group again. Otherwise we will generate a state_id
+ # which will be used as a cache key for future resolutions, but
+ # not get persisted.
- # TODO: We want to create a state group for this set of events, to
- # increase cache hits, but we need to make sure that it doesn't
- # end up as a prev_group without being added to the database
-
- prev_group = None
- delta_ids = None
- for old_group, old_ids in iteritems(state_groups_ids):
- if not set(new_state) - set(old_ids):
- n_delta_ids = {
- k: v
- for k, v in iteritems(new_state)
- if old_ids.get(k) != v
- }
- if not delta_ids or len(n_delta_ids) < len(delta_ids):
- prev_group = old_group
- delta_ids = n_delta_ids
-
- cache = _StateCacheEntry(
- state=new_state,
- state_group=state_group,
- prev_group=prev_group,
- delta_ids=delta_ids,
- )
+ with Measure(self.clock, "state.create_group_ids"):
+ cache = _make_state_cache_entry(new_state, state_groups_ids)
if self._state_cache is not None:
self._state_cache[group_names] = cache
@@ -528,6 +511,70 @@ class StateResolutionHandler(object):
defer.returnValue(cache)
+def _make_state_cache_entry(
+ new_state,
+ state_groups_ids,
+):
+ """Given a resolved state, and a set of input state groups, pick one to base
+ a new state group on (if any), and return an appropriately-constructed
+ _StateCacheEntry.
+
+ Args:
+ new_state (dict[(str, str), str]): resolved state map (mapping from
+ (type, state_key) to event_id)
+
+ state_groups_ids (dict[int, dict[(str, str), str]]):
+ map from state group id to the state in that state group
+ (where 'state' is a map from state key to event id)
+
+ Returns:
+ _StateCacheEntry
+ """
+ # if the new state matches any of the input state groups, we can
+ # use that state group again. Otherwise we will generate a state_id
+ # which will be used as a cache key for future resolutions, but
+ # not get persisted.
+
+ # first look for exact matches
+ new_state_event_ids = set(itervalues(new_state))
+ for sg, state in iteritems(state_groups_ids):
+ if len(new_state_event_ids) != len(state):
+ continue
+
+ old_state_event_ids = set(itervalues(state))
+ if new_state_event_ids == old_state_event_ids:
+ # got an exact match.
+ return _StateCacheEntry(
+ state=new_state,
+ state_group=sg,
+ )
+
+ # TODO: We want to create a state group for this set of events, to
+ # increase cache hits, but we need to make sure that it doesn't
+ # end up as a prev_group without being added to the database
+
+ # failing that, look for the closest match.
+ prev_group = None
+ delta_ids = None
+
+ for old_group, old_state in iteritems(state_groups_ids):
+ n_delta_ids = {
+ k: v
+ for k, v in iteritems(new_state)
+ if old_state.get(k) != v
+ }
+ if not delta_ids or len(n_delta_ids) < len(delta_ids):
+ prev_group = old_group
+ delta_ids = n_delta_ids
+
+ return _StateCacheEntry(
+ state=new_state,
+ state_group=None,
+ prev_group=prev_group,
+ delta_ids=delta_ids,
+ )
+
+
def _ordered_events(events):
def key_func(e):
return -int(e.depth), hashlib.sha1(e.event_id.encode()).hexdigest()
@@ -569,7 +616,7 @@ def _seperate(state_sets):
with them in different state sets.
Args:
- state_sets(list[dict[(str, str), str]]):
+ state_sets(iterable[dict[(str, str), str]]):
List of dicts of (type, state_key) -> event_id, which are the
different state groups to resolve.
@@ -583,10 +630,11 @@ def _seperate(state_sets):
conflicted_state is a dict mapping (type, state_key) to a set of
event ids for conflicted state keys.
"""
- unconflicted_state = dict(state_sets[0])
+ state_set_iterator = iter(state_sets)
+ unconflicted_state = dict(next(state_set_iterator))
conflicted_state = {}
- for state_set in state_sets[1:]:
+ for state_set in state_set_iterator:
for key, value in iteritems(state_set):
# Check if there is an unconflicted entry for the state key.
unconflicted_value = unconflicted_state.get(key)
@@ -647,7 +695,7 @@ def resolve_events_with_factory(state_sets, event_map, state_map_factory):
for event_id in event_ids
)
if event_map is not None:
- needed_events -= set(event_map.iterkeys())
+ needed_events -= set(iterkeys(event_map))
logger.info("Asking for %d conflicted events", len(needed_events))
@@ -668,7 +716,7 @@ def resolve_events_with_factory(state_sets, event_map, state_map_factory):
new_needed_events = set(itervalues(auth_events))
new_needed_events -= needed_events
if event_map is not None:
- new_needed_events -= set(event_map.iterkeys())
+ new_needed_events -= set(iterkeys(event_map))
logger.info("Asking for %d auth events", len(new_needed_events))
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 98dde77431..44f37b4c1e 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -311,6 +311,12 @@ class SQLBaseStore(object):
after_callbacks = []
exception_callbacks = []
+ if LoggingContext.current_context() == LoggingContext.sentinel:
+ logger.warn(
+ "Starting db txn '%s' from sentinel context",
+ desc,
+ )
+
try:
result = yield self.runWithConnection(
self._new_transaction,
@@ -344,7 +350,7 @@ class SQLBaseStore(object):
parent_context = LoggingContext.current_context()
if parent_context == LoggingContext.sentinel:
logger.warn(
- "Running db txn from sentinel context: metrics will be lost",
+ "Starting db connection from sentinel context: metrics will be lost",
)
parent_context = None
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index dc9eca7d15..5fe1ca2de7 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -19,6 +19,8 @@ from canonicaljson import json
from twisted.internet import defer
+from synapse.metrics.background_process_metrics import run_as_background_process
+
from . import engines
from ._base import SQLBaseStore
@@ -87,10 +89,14 @@ class BackgroundUpdateStore(SQLBaseStore):
self._background_update_handlers = {}
self._all_done = False
- @defer.inlineCallbacks
def start_doing_background_updates(self):
- logger.info("Starting background schema updates")
+ run_as_background_process(
+ "background_updates", self._run_background_updates,
+ )
+ @defer.inlineCallbacks
+ def _run_background_updates(self):
+ logger.info("Starting background schema updates")
while True:
yield self.hs.get_clock().sleep(
self.BACKGROUND_UPDATE_INTERVAL_MS / 1000.)
diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py
index b78eda3413..b8cefd43d6 100644
--- a/synapse/storage/client_ips.py
+++ b/synapse/storage/client_ips.py
@@ -19,6 +19,7 @@ from six import iteritems
from twisted.internet import defer
+from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.caches import CACHE_SIZE_FACTOR
from . import background_updates
@@ -93,10 +94,16 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
self._batch_row_update[key] = (user_agent, device_id, now)
def _update_client_ips_batch(self):
- to_update = self._batch_row_update
- self._batch_row_update = {}
- return self.runInteraction(
- "_update_client_ips_batch", self._update_client_ips_batch_txn, to_update
+ def update():
+ to_update = self._batch_row_update
+ self._batch_row_update = {}
+ return self.runInteraction(
+ "_update_client_ips_batch", self._update_client_ips_batch_txn,
+ to_update,
+ )
+
+ return run_as_background_process(
+ "update_client_ips", update,
)
def _update_client_ips_batch_txn(self, txn, to_update):
diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
index ec68e39f1e..c0943ecf91 100644
--- a/synapse/storage/devices.py
+++ b/synapse/storage/devices.py
@@ -21,6 +21,7 @@ from canonicaljson import json
from twisted.internet import defer
from synapse.api.errors import StoreError
+from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList
from ._base import Cache, SQLBaseStore
@@ -248,17 +249,31 @@ class DeviceStore(SQLBaseStore):
def _update_remote_device_list_cache_entry_txn(self, txn, user_id, device_id,
content, stream_id):
- self._simple_upsert_txn(
- txn,
- table="device_lists_remote_cache",
- keyvalues={
- "user_id": user_id,
- "device_id": device_id,
- },
- values={
- "content": json.dumps(content),
- }
- )
+ if content.get("deleted"):
+ self._simple_delete_txn(
+ txn,
+ table="device_lists_remote_cache",
+ keyvalues={
+ "user_id": user_id,
+ "device_id": device_id,
+ },
+ )
+
+ txn.call_after(
+ self.device_id_exists_cache.invalidate, (user_id, device_id,)
+ )
+ else:
+ self._simple_upsert_txn(
+ txn,
+ table="device_lists_remote_cache",
+ keyvalues={
+ "user_id": user_id,
+ "device_id": device_id,
+ },
+ values={
+ "content": json.dumps(content),
+ }
+ )
txn.call_after(self._get_cached_user_device.invalidate, (user_id, device_id,))
txn.call_after(self._get_cached_devices_for_user.invalidate, (user_id,))
@@ -366,7 +381,7 @@ class DeviceStore(SQLBaseStore):
now_stream_id = max(stream_id for stream_id in itervalues(query_map))
devices = self._get_e2e_device_keys_txn(
- txn, query_map.keys(), include_all_devices=True
+ txn, query_map.keys(), include_all_devices=True, include_deleted_devices=True
)
prev_sent_id_sql = """
@@ -393,12 +408,15 @@ class DeviceStore(SQLBaseStore):
prev_id = stream_id
- key_json = device.get("key_json", None)
- if key_json:
- result["keys"] = json.loads(key_json)
- device_display_name = device.get("device_display_name", None)
- if device_display_name:
- result["device_display_name"] = device_display_name
+ if device is not None:
+ key_json = device.get("key_json", None)
+ if key_json:
+ result["keys"] = json.loads(key_json)
+ device_display_name = device.get("device_display_name", None)
+ if device_display_name:
+ result["device_display_name"] = device_display_name
+ else:
+ result["deleted"] = True
results.append(result)
@@ -694,6 +712,9 @@ class DeviceStore(SQLBaseStore):
logger.info("Pruned %d device list outbound pokes", txn.rowcount)
- return self.runInteraction(
- "_prune_old_outbound_device_pokes", _prune_txn
+ return run_as_background_process(
+ "prune_old_outbound_device_pokes",
+ self.runInteraction,
+ "_prune_old_outbound_device_pokes",
+ _prune_txn,
)
diff --git a/synapse/storage/end_to_end_keys.py b/synapse/storage/end_to_end_keys.py
index 7ae5c65482..523b4360c3 100644
--- a/synapse/storage/end_to_end_keys.py
+++ b/synapse/storage/end_to_end_keys.py
@@ -64,12 +64,18 @@ class EndToEndKeyStore(SQLBaseStore):
)
@defer.inlineCallbacks
- def get_e2e_device_keys(self, query_list, include_all_devices=False):
+ def get_e2e_device_keys(
+ self, query_list, include_all_devices=False,
+ include_deleted_devices=False,
+ ):
"""Fetch a list of device keys.
Args:
query_list(list): List of pairs of user_ids and device_ids.
include_all_devices (bool): whether to include entries for devices
that don't have device keys
+ include_deleted_devices (bool): whether to include null entries for
+ devices which no longer exist (but were in the query_list).
+ This option only takes effect if include_all_devices is true.
Returns:
Dict mapping from user-id to dict mapping from device_id to
dict containing "key_json", "device_display_name".
@@ -79,7 +85,7 @@ class EndToEndKeyStore(SQLBaseStore):
results = yield self.runInteraction(
"get_e2e_device_keys", self._get_e2e_device_keys_txn,
- query_list, include_all_devices,
+ query_list, include_all_devices, include_deleted_devices,
)
for user_id, device_keys in iteritems(results):
@@ -88,10 +94,19 @@ class EndToEndKeyStore(SQLBaseStore):
defer.returnValue(results)
- def _get_e2e_device_keys_txn(self, txn, query_list, include_all_devices):
+ def _get_e2e_device_keys_txn(
+ self, txn, query_list, include_all_devices=False,
+ include_deleted_devices=False,
+ ):
query_clauses = []
query_params = []
+ if include_all_devices is False:
+ include_deleted_devices = False
+
+ if include_deleted_devices:
+ deleted_devices = set(query_list)
+
for (user_id, device_id) in query_list:
query_clause = "user_id = ?"
query_params.append(user_id)
@@ -119,8 +134,14 @@ class EndToEndKeyStore(SQLBaseStore):
result = {}
for row in rows:
+ if include_deleted_devices:
+ deleted_devices.remove((row["user_id"], row["device_id"]))
result.setdefault(row["user_id"], {})[row["device_id"]] = row
+ if include_deleted_devices:
+ for user_id, device_id in deleted_devices:
+ result.setdefault(user_id, {})[device_id] = None
+
return result
@defer.inlineCallbacks
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 8d366d1b91..5d3ee90017 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -23,6 +23,7 @@ from unpaddedbase64 import encode_base64
from twisted.internet import defer
from synapse.api.errors import StoreError
+from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage._base import SQLBaseStore
from synapse.storage.events import EventsWorkerStore
from synapse.storage.signatures import SignatureWorkerStore
@@ -113,9 +114,9 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
sql = (
"SELECT b.event_id, MAX(e.depth) FROM events as e"
" INNER JOIN event_edges as g"
- " ON g.event_id = e.event_id AND g.room_id = e.room_id"
+ " ON g.event_id = e.event_id"
" INNER JOIN event_backward_extremities as b"
- " ON g.prev_event_id = b.event_id AND g.room_id = b.room_id"
+ " ON g.prev_event_id = b.event_id"
" WHERE b.room_id = ? AND g.is_state is ?"
" GROUP BY b.event_id"
)
@@ -329,8 +330,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
"SELECT depth, prev_event_id FROM event_edges"
" INNER JOIN events"
" ON prev_event_id = events.event_id"
- " AND event_edges.room_id = events.room_id"
- " WHERE event_edges.room_id = ? AND event_edges.event_id = ?"
+ " WHERE event_edges.event_id = ?"
" AND event_edges.is_state = ?"
" LIMIT ?"
)
@@ -364,7 +364,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
txn.execute(
query,
- (room_id, event_id, False, limit - len(event_results))
+ (event_id, False, limit - len(event_results))
)
for row in txn:
@@ -401,7 +401,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
query = (
"SELECT prev_event_id FROM event_edges "
- "WHERE room_id = ? AND event_id = ? AND is_state = ? "
+ "WHERE event_id = ? AND is_state = ? "
"LIMIT ?"
)
@@ -410,7 +410,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
for event_id in front:
txn.execute(
query,
- (room_id, event_id, False, limit - len(event_results))
+ (event_id, False, limit - len(event_results))
)
for e_id, in txn:
@@ -446,7 +446,7 @@ class EventFederationStore(EventFederationWorkerStore):
)
hs.get_clock().looping_call(
- self._delete_old_forward_extrem_cache, 60 * 60 * 1000
+ self._delete_old_forward_extrem_cache, 60 * 60 * 1000,
)
def _update_min_depth_for_room_txn(self, txn, room_id, depth):
@@ -548,9 +548,11 @@ class EventFederationStore(EventFederationWorkerStore):
sql,
(self.stream_ordering_month_ago, self.stream_ordering_month_ago,)
)
- return self.runInteraction(
+ return run_as_background_process(
+ "delete_old_forward_extrem_cache",
+ self.runInteraction,
"_delete_old_forward_extrem_cache",
- _delete_old_forward_extrem_cache_txn
+ _delete_old_forward_extrem_cache_txn,
)
def clean_room_for_join(self, room_id):
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index 29b511ae5e..6840320641 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -22,6 +22,7 @@ from canonicaljson import json
from twisted.internet import defer
+from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage._base import LoggingTransaction, SQLBaseStore
from synapse.util.caches.descriptors import cachedInlineCallbacks
@@ -458,11 +459,12 @@ class EventPushActionsWorkerStore(SQLBaseStore):
"Error removing push actions after event persistence failure",
)
- @defer.inlineCallbacks
def _find_stream_orderings_for_times(self):
- yield self.runInteraction(
+ return run_as_background_process(
+ "event_push_action_stream_orderings",
+ self.runInteraction,
"_find_stream_orderings_for_times",
- self._find_stream_orderings_for_times_txn
+ self._find_stream_orderings_for_times_txn,
)
def _find_stream_orderings_for_times_txn(self, txn):
@@ -604,7 +606,7 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
self._doing_notif_rotation = False
self._rotate_notif_loop = self._clock.looping_call(
- self._rotate_notifs, 30 * 60 * 1000
+ self._start_rotate_notifs, 30 * 60 * 1000,
)
def _set_push_actions_for_event_and_users_txn(self, txn, events_and_contexts,
@@ -787,6 +789,9 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
WHERE room_id = ? AND user_id = ? AND stream_ordering <= ?
""", (room_id, user_id, stream_ordering))
+ def _start_rotate_notifs(self):
+ return run_as_background_process("rotate_notifs", self._rotate_notifs)
+
@defer.inlineCallbacks
def _rotate_notifs(self):
if self._doing_notif_rotation or self.stream_ordering_day_ago is None:
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 2aaab0d02c..2f482af3a1 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -19,7 +19,7 @@ import logging
from collections import OrderedDict, deque, namedtuple
from functools import wraps
-from six import iteritems, itervalues
+from six import iteritems
from six.moves import range
from canonicaljson import json
@@ -33,6 +33,7 @@ from synapse.api.errors import SynapseError
# these are only included to make the type annotations work
from synapse.events import EventBase # noqa: F401
from synapse.events.snapshot import EventContext # noqa: F401
+from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.events_worker import EventsWorkerStore
from synapse.types import RoomStreamToken, get_domain_from_id
from synapse.util.async import ObservableDeferred
@@ -141,25 +142,22 @@ class _EventPeristenceQueue(object):
try:
queue = self._get_drainining_queue(room_id)
for item in queue:
- # handle_queue_loop runs in the sentinel logcontext, so
- # there is no need to preserve_fn when running the
- # callbacks on the deferred.
try:
ret = yield per_item_callback(item)
- item.deferred.callback(ret)
except Exception:
- item.deferred.errback()
+ with PreserveLoggingContext():
+ item.deferred.errback()
+ else:
+ with PreserveLoggingContext():
+ item.deferred.callback(ret)
finally:
queue = self._event_persist_queues.pop(room_id, None)
if queue:
self._event_persist_queues[room_id] = queue
self._currently_persisting_rooms.discard(room_id)
- # set handle_queue_loop off on the background. We don't want to
- # attribute work done in it to the current request, so we drop the
- # logcontext altogether.
- with PreserveLoggingContext():
- handle_queue_loop()
+ # set handle_queue_loop off in the background
+ run_as_background_process("persist_events", handle_queue_loop)
def _get_drainining_queue(self, room_id):
queue = self._event_persist_queues.setdefault(room_id, deque())
@@ -345,11 +343,14 @@ class EventsStore(EventsWorkerStore):
new_forward_extremeties = {}
# map room_id->(type,state_key)->event_id tracking the full
- # state in each room after adding these events
+ # state in each room after adding these events.
+ # This is simply used to prefill the get_current_state_ids
+ # cache
current_state_for_room = {}
- # map room_id->(to_delete, to_insert) where each entry is
- # a map (type,key)->event_id giving the state delta in each
+ # map room_id->(to_delete, to_insert) where to_delete is a list
+ # of type/state keys to remove from current state, and to_insert
+ # is a map (type,key)->event_id giving the state delta in each
# room
state_delta_for_room = {}
@@ -419,19 +420,40 @@ class EventsStore(EventsWorkerStore):
logger.info(
"Calculating state delta for room %s", room_id,
)
- current_state = yield self._get_new_state_after_events(
- room_id,
- ev_ctx_rm,
- latest_event_ids,
- new_latest_event_ids,
- )
+ with Measure(
+ self._clock,
+ "persist_events.get_new_state_after_events",
+ ):
+ res = yield self._get_new_state_after_events(
+ room_id,
+ ev_ctx_rm,
+ latest_event_ids,
+ new_latest_event_ids,
+ )
+ current_state, delta_ids = res
+
+ # If either are not None then there has been a change,
+ # and we need to work out the delta (or use that
+ # given)
+ if delta_ids is not None:
+ # If there is a delta we know that we've
+ # only added or replaced state, never
+ # removed keys entirely.
+ state_delta_for_room[room_id] = ([], delta_ids)
+ elif current_state is not None:
+ with Measure(
+ self._clock,
+ "persist_events.calculate_state_delta",
+ ):
+ delta = yield self._calculate_state_delta(
+ room_id, current_state,
+ )
+ state_delta_for_room[room_id] = delta
+
+ # If we have the current_state then lets prefill
+ # the cache with it.
if current_state is not None:
current_state_for_room[room_id] = current_state
- delta = yield self._calculate_state_delta(
- room_id, current_state,
- )
- if delta is not None:
- state_delta_for_room[room_id] = delta
yield self.runInteraction(
"persist_events",
@@ -498,7 +520,6 @@ class EventsStore(EventsWorkerStore):
iterable=list(new_latest_event_ids),
retcols=["prev_event_id"],
keyvalues={
- "room_id": room_id,
"is_state": False,
},
desc="_calculate_new_extremeties",
@@ -530,9 +551,15 @@ class EventsStore(EventsWorkerStore):
the new forward extremities for the room.
Returns:
- Deferred[dict[(str,str), str]|None]:
- None if there are no changes to the room state, or
- a dict of (type, state_key) -> event_id].
+ Deferred[tuple[dict[(str,str), str]|None, dict[(str,str), str]|None]]:
+ Returns a tuple of two state maps, the first being the full new current
+ state and the second being the delta to the existing current state.
+ If both are None then there has been no change.
+
+ If there has been a change then we only return the delta if its
+ already been calculated. Conversely if we do know the delta then
+ the new current state is only returned if we've already calculated
+ it.
"""
if not new_latest_event_ids:
@@ -540,18 +567,32 @@ class EventsStore(EventsWorkerStore):
# map from state_group to ((type, key) -> event_id) state map
state_groups_map = {}
+
+ # Map from (prev state group, new state group) -> delta state dict
+ state_group_deltas = {}
+
for ev, ctx in events_context:
if ctx.state_group is None:
- # I don't think this can happen, but let's double-check
- raise Exception(
- "Context for new extremity event %s has no state "
- "group" % (ev.event_id, ),
- )
+ # This should only happen for outlier events.
+ if not ev.internal_metadata.is_outlier():
+ raise Exception(
+ "Context for new event %s has no state "
+ "group" % (ev.event_id, ),
+ )
+ continue
if ctx.state_group in state_groups_map:
continue
- state_groups_map[ctx.state_group] = ctx.current_state_ids
+ # We're only interested in pulling out state that has already
+ # been cached in the context. We'll pull stuff out of the DB later
+ # if necessary.
+ current_state_ids = ctx.get_cached_current_state_ids()
+ if current_state_ids is not None:
+ state_groups_map[ctx.state_group] = current_state_ids
+
+ if ctx.prev_group:
+ state_group_deltas[(ctx.prev_group, ctx.state_group)] = ctx.delta_ids
# We need to map the event_ids to their state groups. First, let's
# check if the event is one we're persisting, in which case we can
@@ -566,7 +607,7 @@ class EventsStore(EventsWorkerStore):
for event_id in new_latest_event_ids:
# First search in the list of new events we're adding.
for ev, ctx in events_context:
- if event_id == ev.event_id:
+ if event_id == ev.event_id and ctx.state_group is not None:
event_id_to_state_group[event_id] = ctx.state_group
break
else:
@@ -594,7 +635,26 @@ class EventsStore(EventsWorkerStore):
# If they old and new groups are the same then we don't need to do
# anything.
if old_state_groups == new_state_groups:
- return
+ defer.returnValue((None, None))
+
+ if len(new_state_groups) == 1 and len(old_state_groups) == 1:
+ # If we're going from one state group to another, lets check if
+ # we have a delta for that transition. If we do then we can just
+ # return that.
+
+ new_state_group = next(iter(new_state_groups))
+ old_state_group = next(iter(old_state_groups))
+
+ delta_ids = state_group_deltas.get(
+ (old_state_group, new_state_group,), None
+ )
+ if delta_ids is not None:
+ # We have a delta from the existing to new current state,
+ # so lets just return that. If we happen to already have
+ # the current state in memory then lets also return that,
+ # but it doesn't matter if we don't.
+ new_state = state_groups_map.get(new_state_group)
+ defer.returnValue((new_state, delta_ids))
# Now that we have calculated new_state_groups we need to get
# their state IDs so we can resolve to a single state set.
@@ -606,7 +666,7 @@ class EventsStore(EventsWorkerStore):
if len(new_state_groups) == 1:
# If there is only one state group, then we know what the current
# state is.
- defer.returnValue(state_groups_map[new_state_groups.pop()])
+ defer.returnValue((state_groups_map[new_state_groups.pop()], None))
# Ok, we need to defer to the state handler to resolve our state sets.
@@ -625,7 +685,7 @@ class EventsStore(EventsWorkerStore):
room_id, state_groups, events_map, get_events
)
- defer.returnValue(res.state)
+ defer.returnValue((res.state, None))
@defer.inlineCallbacks
def _calculate_state_delta(self, room_id, current_state):
@@ -634,28 +694,20 @@ class EventsStore(EventsWorkerStore):
Assumes that we are only persisting events for one room at a time.
Returns:
- 2-tuple (to_delete, to_insert) where both are state dicts,
- i.e. (type, state_key) -> event_id. `to_delete` are the entries to
- first be deleted from current_state_events, `to_insert` are entries
- to insert.
+ tuple[list, dict] (to_delete, to_insert): where to_delete are the
+ type/state_keys to remove from current_state_events and `to_insert`
+ are the updates to current_state_events.
"""
existing_state = yield self.get_current_state_ids(room_id)
- existing_events = set(itervalues(existing_state))
- new_events = set(ev_id for ev_id in itervalues(current_state))
- changed_events = existing_events ^ new_events
-
- if not changed_events:
- return
+ to_delete = [
+ key for key in existing_state
+ if key not in current_state
+ ]
- to_delete = {
- key: ev_id for key, ev_id in iteritems(existing_state)
- if ev_id in changed_events
- }
- events_to_insert = (new_events - existing_events)
to_insert = {
key: ev_id for key, ev_id in iteritems(current_state)
- if ev_id in events_to_insert
+ if ev_id != existing_state.get(key)
}
defer.returnValue((to_delete, to_insert))
@@ -678,10 +730,10 @@ class EventsStore(EventsWorkerStore):
delete_existing (bool): True to purge existing table rows for the
events from the database. This is useful when retrying due to
IntegrityError.
- state_delta_for_room (dict[str, (list[str], list[str])]):
+ state_delta_for_room (dict[str, (list, dict)]):
The current-state delta for each room. For each room, a tuple
- (to_delete, to_insert), being a list of event ids to be removed
- from the current state, and a list of event ids to be added to
+ (to_delete, to_insert), being a list of type/state keys to be
+ removed from the current state, and a state set to be added to
the current state.
new_forward_extremeties (dict[str, list[str]]):
The new forward extremities for each room. For each room, a
@@ -759,9 +811,46 @@ class EventsStore(EventsWorkerStore):
def _update_current_state_txn(self, txn, state_delta_by_room, max_stream_order):
for room_id, current_state_tuple in iteritems(state_delta_by_room):
to_delete, to_insert = current_state_tuple
+
+ # First we add entries to the current_state_delta_stream. We
+ # do this before updating the current_state_events table so
+ # that we can use it to calculate the `prev_event_id`. (This
+ # allows us to not have to pull out the existing state
+ # unnecessarily).
+ sql = """
+ INSERT INTO current_state_delta_stream
+ (stream_id, room_id, type, state_key, event_id, prev_event_id)
+ SELECT ?, ?, ?, ?, ?, (
+ SELECT event_id FROM current_state_events
+ WHERE room_id = ? AND type = ? AND state_key = ?
+ )
+ """
+ txn.executemany(sql, (
+ (
+ max_stream_order, room_id, etype, state_key, None,
+ room_id, etype, state_key,
+ )
+ for etype, state_key in to_delete
+ # We sanity check that we're deleting rather than updating
+ if (etype, state_key) not in to_insert
+ ))
+ txn.executemany(sql, (
+ (
+ max_stream_order, room_id, etype, state_key, ev_id,
+ room_id, etype, state_key,
+ )
+ for (etype, state_key), ev_id in iteritems(to_insert)
+ ))
+
+ # Now we actually update the current_state_events table
+
txn.executemany(
- "DELETE FROM current_state_events WHERE event_id = ?",
- [(ev_id,) for ev_id in itervalues(to_delete)],
+ "DELETE FROM current_state_events"
+ " WHERE room_id = ? AND type = ? AND state_key = ?",
+ (
+ (room_id, etype, state_key)
+ for etype, state_key in itertools.chain(to_delete, to_insert)
+ ),
)
self._simple_insert_many_txn(
@@ -778,25 +867,6 @@ class EventsStore(EventsWorkerStore):
],
)
- state_deltas = {key: None for key in to_delete}
- state_deltas.update(to_insert)
-
- self._simple_insert_many_txn(
- txn,
- table="current_state_delta_stream",
- values=[
- {
- "stream_id": max_stream_order,
- "room_id": room_id,
- "type": key[0],
- "state_key": key[1],
- "event_id": ev_id,
- "prev_event_id": to_delete.get(key, None),
- }
- for key, ev_id in iteritems(state_deltas)
- ]
- )
-
txn.call_after(
self._curr_state_delta_stream_cache.entity_has_changed,
room_id, max_stream_order,
@@ -810,7 +880,8 @@ class EventsStore(EventsWorkerStore):
# and which we have added, then we invlidate the caches for all
# those users.
members_changed = set(
- state_key for ev_type, state_key in state_deltas
+ state_key
+ for ev_type, state_key in itertools.chain(to_delete, to_insert)
if ev_type == EventTypes.Member
)
@@ -1066,7 +1137,7 @@ class EventsStore(EventsWorkerStore):
):
txn.executemany(
"DELETE FROM %s WHERE room_id = ? AND event_id = ?" % (table,),
- [(ev.event_id,) for ev, _ in events_and_contexts]
+ [(ev.room_id, ev.event_id) for ev, _ in events_and_contexts]
)
def _store_event_txn(self, txn, events_and_contexts):
@@ -1117,7 +1188,6 @@ class EventsStore(EventsWorkerStore):
"type": event.type,
"processed": True,
"outlier": event.internal_metadata.is_outlier(),
- "content": encode_json(event.content).decode("UTF-8"),
"origin_server_ts": int(event.origin_server_ts),
"received_ts": self._clock.time_msec(),
"sender": event.sender,
diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py
index 67433606c6..f28239a808 100644
--- a/synapse/storage/events_worker.py
+++ b/synapse/storage/events_worker.py
@@ -25,6 +25,7 @@ from synapse.events import EventBase # noqa: F401
from synapse.events import FrozenEvent
from synapse.events.snapshot import EventContext # noqa: F401
from synapse.events.utils import prune_event
+from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.logcontext import (
LoggingContext,
PreserveLoggingContext,
@@ -322,10 +323,11 @@ class EventsWorkerStore(SQLBaseStore):
should_start = False
if should_start:
- with PreserveLoggingContext():
- self.runWithConnection(
- self._do_fetch
- )
+ run_as_background_process(
+ "fetch_events",
+ self.runWithConnection,
+ self._do_fetch,
+ )
logger.debug("Loading %d events", len(events))
with PreserveLoggingContext():
diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py
index be655d287b..6a5028961d 100644
--- a/synapse/storage/push_rule.py
+++ b/synapse/storage/push_rule.py
@@ -21,7 +21,6 @@ from canonicaljson import json
from twisted.internet import defer
-from synapse.api.constants import EventTypes
from synapse.push.baserules import list_with_base_rules
from synapse.storage.appservice import ApplicationServiceWorkerStore
from synapse.storage.pusher import PusherWorkerStore
@@ -186,6 +185,7 @@ class PushRulesWorkerStore(ApplicationServiceWorkerStore,
defer.returnValue(results)
+ @defer.inlineCallbacks
def bulk_get_push_rules_for_room(self, event, context):
state_group = context.state_group
if not state_group:
@@ -195,9 +195,11 @@ class PushRulesWorkerStore(ApplicationServiceWorkerStore,
# To do this we set the state_group to a new object as object() != object()
state_group = object()
- return self._bulk_get_push_rules_for_room(
- event.room_id, state_group, context.current_state_ids, event=event
+ current_state_ids = yield context.get_current_state_ids(self)
+ result = yield self._bulk_get_push_rules_for_room(
+ event.room_id, state_group, current_state_ids, event=event
)
+ defer.returnValue(result)
@cachedInlineCallbacks(num_args=2, cache_context=True)
def _bulk_get_push_rules_for_room(self, room_id, state_group, current_state_ids,
@@ -247,18 +249,6 @@ class PushRulesWorkerStore(ApplicationServiceWorkerStore,
if uid in local_users_in_room:
user_ids.add(uid)
- forgotten = yield self.who_forgot_in_room(
- event.room_id, on_invalidate=cache_context.invalidate,
- )
-
- for row in forgotten:
- user_id = row["user_id"]
- event_id = row["event_id"]
-
- mem_id = current_state_ids.get((EventTypes.Member, user_id), None)
- if event_id == mem_id:
- user_ids.discard(user_id)
-
rules_by_user = yield self.bulk_get_push_rules(
user_ids, on_invalidate=cache_context.invalidate,
)
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index cc273a57b2..8443bd4c1b 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -233,7 +233,7 @@ class PusherStore(PusherWorkerStore):
)
if newly_inserted:
- self.runInteraction(
+ yield self.runInteraction(
"add_pusher",
self._invalidate_cache_and_stream,
self.get_if_user_has_pusher, (user_id,)
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 02a802bed9..027bf8c85e 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -232,6 +232,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
defer.returnValue(user_who_share_room)
+ @defer.inlineCallbacks
def get_joined_users_from_context(self, event, context):
state_group = context.state_group
if not state_group:
@@ -241,11 +242,13 @@ class RoomMemberWorkerStore(EventsWorkerStore):
# To do this we set the state_group to a new object as object() != object()
state_group = object()
- return self._get_joined_users_from_context(
- event.room_id, state_group, context.current_state_ids,
+ current_state_ids = yield context.get_current_state_ids(self)
+ result = yield self._get_joined_users_from_context(
+ event.room_id, state_group, current_state_ids,
event=event,
context=context,
)
+ defer.returnValue(result)
def get_joined_users_from_state(self, room_id, state_entry):
state_group = state_entry.state_group
@@ -458,17 +461,29 @@ class RoomMemberWorkerStore(EventsWorkerStore):
def _get_joined_hosts_cache(self, room_id):
return _JoinedHostsCache(self, room_id)
- @cached()
- def who_forgot_in_room(self, room_id):
- return self._simple_select_list(
- table="room_memberships",
- retcols=("user_id", "event_id"),
- keyvalues={
- "room_id": room_id,
- "forgotten": 1,
- },
- desc="who_forgot"
- )
+ @cachedInlineCallbacks(num_args=2)
+ def did_forget(self, user_id, room_id):
+ """Returns whether user_id has elected to discard history for room_id.
+
+ Returns False if they have since re-joined."""
+ def f(txn):
+ sql = (
+ "SELECT"
+ " COUNT(*)"
+ " FROM"
+ " room_memberships"
+ " WHERE"
+ " user_id = ?"
+ " AND"
+ " room_id = ?"
+ " AND"
+ " forgotten = 0"
+ )
+ txn.execute(sql, (user_id, room_id))
+ rows = txn.fetchall()
+ return rows[0][0]
+ count = yield self.runInteraction("did_forget_membership", f)
+ defer.returnValue(count == 0)
class RoomMemberStore(RoomMemberWorkerStore):
@@ -577,36 +592,11 @@ class RoomMemberStore(RoomMemberWorkerStore):
)
txn.execute(sql, (user_id, room_id))
- txn.call_after(self.did_forget.invalidate, (user_id, room_id))
self._invalidate_cache_and_stream(
- txn, self.who_forgot_in_room, (room_id,)
+ txn, self.did_forget, (user_id, room_id,),
)
return self.runInteraction("forget_membership", f)
- @cachedInlineCallbacks(num_args=2)
- def did_forget(self, user_id, room_id):
- """Returns whether user_id has elected to discard history for room_id.
-
- Returns False if they have since re-joined."""
- def f(txn):
- sql = (
- "SELECT"
- " COUNT(*)"
- " FROM"
- " room_memberships"
- " WHERE"
- " user_id = ?"
- " AND"
- " room_id = ?"
- " AND"
- " forgotten = 0"
- )
- txn.execute(sql, (user_id, room_id))
- rows = txn.fetchall()
- return rows[0][0]
- count = yield self.runInteraction("did_forget_membership", f)
- defer.returnValue(count == 0)
-
@defer.inlineCallbacks
def _background_add_membership_profile(self, progress, batch_size):
target_min_stream_id = progress.get(
diff --git a/synapse/storage/schema/delta/50/make_event_content_nullable.py b/synapse/storage/schema/delta/50/make_event_content_nullable.py
new file mode 100644
index 0000000000..7d27342e39
--- /dev/null
+++ b/synapse/storage/schema/delta/50/make_event_content_nullable.py
@@ -0,0 +1,92 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+We want to stop populating 'event.content', so we need to make it nullable.
+
+If this has to be rolled back, then the following should populate the missing data:
+
+Postgres:
+
+ UPDATE events SET content=(ej.json::json)->'content' FROM event_json ej
+ WHERE ej.event_id = events.event_id AND
+ stream_ordering < (
+ SELECT stream_ordering FROM events WHERE content IS NOT NULL
+ ORDER BY stream_ordering LIMIT 1
+ );
+
+ UPDATE events SET content=(ej.json::json)->'content' FROM event_json ej
+ WHERE ej.event_id = events.event_id AND
+ stream_ordering > (
+ SELECT stream_ordering FROM events WHERE content IS NOT NULL
+ ORDER BY stream_ordering DESC LIMIT 1
+ );
+
+SQLite:
+
+ UPDATE events SET content=(
+ SELECT json_extract(json,'$.content') FROM event_json ej
+ WHERE ej.event_id = events.event_id
+ )
+ WHERE
+ stream_ordering < (
+ SELECT stream_ordering FROM events WHERE content IS NOT NULL
+ ORDER BY stream_ordering LIMIT 1
+ )
+ OR stream_ordering > (
+ SELECT stream_ordering FROM events WHERE content IS NOT NULL
+ ORDER BY stream_ordering DESC LIMIT 1
+ );
+
+"""
+
+import logging
+
+from synapse.storage.engines import PostgresEngine
+
+logger = logging.getLogger(__name__)
+
+
+def run_create(cur, database_engine, *args, **kwargs):
+ pass
+
+
+def run_upgrade(cur, database_engine, *args, **kwargs):
+ if isinstance(database_engine, PostgresEngine):
+ cur.execute("""
+ ALTER TABLE events ALTER COLUMN content DROP NOT NULL;
+ """)
+ return
+
+ # sqlite is an arse about this. ref: https://www.sqlite.org/lang_altertable.html
+
+ cur.execute("SELECT sql FROM sqlite_master WHERE tbl_name='events' AND type='table'")
+ (oldsql,) = cur.fetchone()
+
+ sql = oldsql.replace("content TEXT NOT NULL", "content TEXT")
+ if sql == oldsql:
+ raise Exception("Couldn't find null constraint to drop in %s" % oldsql)
+
+ logger.info("Replacing definition of 'events' with: %s", sql)
+
+ cur.execute("PRAGMA schema_version")
+ (oldver,) = cur.fetchone()
+ cur.execute("PRAGMA writable_schema=ON")
+ cur.execute(
+ "UPDATE sqlite_master SET sql=? WHERE tbl_name='events' AND type='table'",
+ (sql, ),
+ )
+ cur.execute("PRAGMA schema_version=%i" % (oldver+1,))
+ cur.execute("PRAGMA writable_schema=OFF")
diff --git a/synapse/storage/schema/full_schemas/16/event_edges.sql b/synapse/storage/schema/full_schemas/16/event_edges.sql
index 52eec88357..6b5a5a88fa 100644
--- a/synapse/storage/schema/full_schemas/16/event_edges.sql
+++ b/synapse/storage/schema/full_schemas/16/event_edges.sql
@@ -37,7 +37,8 @@ CREATE TABLE IF NOT EXISTS event_edges(
event_id TEXT NOT NULL,
prev_event_id TEXT NOT NULL,
room_id TEXT NOT NULL,
- is_state BOOL NOT NULL,
+ is_state BOOL NOT NULL, -- true if this is a prev_state edge rather than a regular
+ -- event dag edge.
UNIQUE (event_id, prev_event_id, room_id, is_state)
);
diff --git a/synapse/storage/schema/full_schemas/16/im.sql b/synapse/storage/schema/full_schemas/16/im.sql
index ba5346806e..5f5cb8d01d 100644
--- a/synapse/storage/schema/full_schemas/16/im.sql
+++ b/synapse/storage/schema/full_schemas/16/im.sql
@@ -19,7 +19,12 @@ CREATE TABLE IF NOT EXISTS events(
event_id TEXT NOT NULL,
type TEXT NOT NULL,
room_id TEXT NOT NULL,
- content TEXT NOT NULL,
+
+ -- 'content' used to be created NULLable, but as of delta 50 we drop that constraint.
+ -- the hack we use to drop the constraint doesn't work for an in-memory sqlite
+ -- database, which breaks the sytests. Hence, we no longer make it nullable.
+ content TEXT,
+
unrecognized_keys TEXT,
processed BOOL NOT NULL,
outlier BOOL NOT NULL,
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 89a05c4618..b27b3ae144 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -186,7 +186,17 @@ class StateGroupWorkerStore(SQLBaseStore):
@defer.inlineCallbacks
def _get_state_groups_from_groups(self, groups, types):
- """Returns dictionary state_group -> (dict of (type, state_key) -> event id)
+ """Returns the state groups for a given set of groups, filtering on
+ types of state events.
+
+ Args:
+ groups(list[int]): list of state group IDs to query
+ types (Iterable[str, str|None]|None): list of 2-tuples of the form
+ (`type`, `state_key`), where a `state_key` of `None` matches all
+ state_keys for the `type`. If None, all types are returned.
+
+ Returns:
+ dictionary state_group -> (dict of (type, state_key) -> event id)
"""
results = {}
@@ -200,8 +210,11 @@ class StateGroupWorkerStore(SQLBaseStore):
defer.returnValue(results)
- def _get_state_groups_from_groups_txn(self, txn, groups, types=None):
+ def _get_state_groups_from_groups_txn(
+ self, txn, groups, types=None,
+ ):
results = {group: {} for group in groups}
+
if types is not None:
types = list(set(types)) # deduplicate types list
@@ -239,7 +252,7 @@ class StateGroupWorkerStore(SQLBaseStore):
# Turns out that postgres doesn't like doing a list of OR's and
# is about 1000x slower, so we just issue a query for each specific
# type seperately.
- if types:
+ if types is not None:
clause_to_args = [
(
"AND type = ? AND state_key = ?",
@@ -278,6 +291,7 @@ class StateGroupWorkerStore(SQLBaseStore):
else:
where_clauses.append("(type = ? AND state_key = ?)")
where_args.extend([typ[0], typ[1]])
+
where_clause = "AND (%s)" % (" OR ".join(where_clauses))
else:
where_clause = ""
@@ -332,16 +346,20 @@ class StateGroupWorkerStore(SQLBaseStore):
return results
@defer.inlineCallbacks
- def get_state_for_events(self, event_ids, types):
+ def get_state_for_events(self, event_ids, types, filtered_types=None):
"""Given a list of event_ids and type tuples, return a list of state
dicts for each event. The state dicts will only have the type/state_keys
that are in the `types` list.
Args:
- event_ids (list)
- types (list): List of (type, state_key) tuples which are used to
- filter the state fetched. `state_key` may be None, which matches
- any `state_key`
+ event_ids (list[string])
+ types (list[(str, str|None)]|None): List of (type, state_key) tuples
+ which are used to filter the state fetched. If `state_key` is None,
+ all events are returned of the given type.
+ May be None, which matches any key.
+ filtered_types(list[str]|None): Only apply filtering via `types` to this
+ list of event types. Other types of events are returned unfiltered.
+ If None, `types` filtering is applied to all events.
Returns:
deferred: A list of dicts corresponding to the event_ids given.
@@ -352,7 +370,7 @@ class StateGroupWorkerStore(SQLBaseStore):
)
groups = set(itervalues(event_to_groups))
- group_to_state = yield self._get_state_for_groups(groups, types)
+ group_to_state = yield self._get_state_for_groups(groups, types, filtered_types)
state_event_map = yield self.get_events(
[ev_id for sd in itervalues(group_to_state) for ev_id in itervalues(sd)],
@@ -371,15 +389,19 @@ class StateGroupWorkerStore(SQLBaseStore):
defer.returnValue({event: event_to_state[event] for event in event_ids})
@defer.inlineCallbacks
- def get_state_ids_for_events(self, event_ids, types=None):
+ def get_state_ids_for_events(self, event_ids, types=None, filtered_types=None):
"""
Get the state dicts corresponding to a list of events
Args:
event_ids(list(str)): events whose state should be returned
- types(list[(str, str)]|None): List of (type, state_key) tuples
- which are used to filter the state fetched. May be None, which
- matches any key
+ types(list[(str, str|None)]|None): List of (type, state_key) tuples
+ which are used to filter the state fetched. If `state_key` is None,
+ all events are returned of the given type.
+ May be None, which matches any key.
+ filtered_types(list[str]|None): Only apply filtering via `types` to this
+ list of event types. Other types of events are returned unfiltered.
+ If None, `types` filtering is applied to all events.
Returns:
A deferred dict from event_id -> (type, state_key) -> state_event
@@ -389,7 +411,7 @@ class StateGroupWorkerStore(SQLBaseStore):
)
groups = set(itervalues(event_to_groups))
- group_to_state = yield self._get_state_for_groups(groups, types)
+ group_to_state = yield self._get_state_for_groups(groups, types, filtered_types)
event_to_state = {
event_id: group_to_state[group]
@@ -399,37 +421,45 @@ class StateGroupWorkerStore(SQLBaseStore):
defer.returnValue({event: event_to_state[event] for event in event_ids})
@defer.inlineCallbacks
- def get_state_for_event(self, event_id, types=None):
+ def get_state_for_event(self, event_id, types=None, filtered_types=None):
"""
Get the state dict corresponding to a particular event
Args:
event_id(str): event whose state should be returned
- types(list[(str, str)]|None): List of (type, state_key) tuples
- which are used to filter the state fetched. May be None, which
- matches any key
+ types(list[(str, str|None)]|None): List of (type, state_key) tuples
+ which are used to filter the state fetched. If `state_key` is None,
+ all events are returned of the given type.
+ May be None, which matches any key.
+ filtered_types(list[str]|None): Only apply filtering via `types` to this
+ list of event types. Other types of events are returned unfiltered.
+ If None, `types` filtering is applied to all events.
Returns:
A deferred dict from (type, state_key) -> state_event
"""
- state_map = yield self.get_state_for_events([event_id], types)
+ state_map = yield self.get_state_for_events([event_id], types, filtered_types)
defer.returnValue(state_map[event_id])
@defer.inlineCallbacks
- def get_state_ids_for_event(self, event_id, types=None):
+ def get_state_ids_for_event(self, event_id, types=None, filtered_types=None):
"""
Get the state dict corresponding to a particular event
Args:
event_id(str): event whose state should be returned
- types(list[(str, str)]|None): List of (type, state_key) tuples
- which are used to filter the state fetched. May be None, which
- matches any key
+ types(list[(str, str|None)]|None): List of (type, state_key) tuples
+ which are used to filter the state fetched. If `state_key` is None,
+ all events are returned of the given type.
+ May be None, which matches any key.
+ filtered_types(list[str]|None): Only apply filtering via `types` to this
+ list of event types. Other types of events are returned unfiltered.
+ If None, `types` filtering is applied to all events.
Returns:
A deferred dict from (type, state_key) -> state_event
"""
- state_map = yield self.get_state_ids_for_events([event_id], types)
+ state_map = yield self.get_state_ids_for_events([event_id], types, filtered_types)
defer.returnValue(state_map[event_id])
@cached(max_entries=50000)
@@ -460,56 +490,73 @@ class StateGroupWorkerStore(SQLBaseStore):
defer.returnValue({row["event_id"]: row["state_group"] for row in rows})
- def _get_some_state_from_cache(self, group, types):
+ def _get_some_state_from_cache(self, group, types, filtered_types=None):
"""Checks if group is in cache. See `_get_state_for_groups`
- Returns 3-tuple (`state_dict`, `missing_types`, `got_all`).
- `missing_types` is the list of types that aren't in the cache for that
- group. `got_all` is a bool indicating if we successfully retrieved all
+ Args:
+ group(int): The state group to lookup
+ types(list[str, str|None]): List of 2-tuples of the form
+ (`type`, `state_key`), where a `state_key` of `None` matches all
+ state_keys for the `type`.
+ filtered_types(list[str]|None): Only apply filtering via `types` to this
+ list of event types. Other types of events are returned unfiltered.
+ If None, `types` filtering is applied to all events.
+
+ Returns 2-tuple (`state_dict`, `got_all`).
+ `got_all` is a bool indicating if we successfully retrieved all
requests state from the cache, if False we need to query the DB for the
missing state.
-
- Args:
- group: The state group to lookup
- types (list): List of 2-tuples of the form (`type`, `state_key`),
- where a `state_key` of `None` matches all state_keys for the
- `type`.
"""
is_all, known_absent, state_dict_ids = self._state_group_cache.get(group)
type_to_key = {}
- missing_types = set()
+
+ # tracks whether any of ourrequested types are missing from the cache
+ missing_types = False
for typ, state_key in types:
key = (typ, state_key)
- if state_key is None:
+
+ if (
+ state_key is None or
+ (filtered_types is not None and typ not in filtered_types)
+ ):
type_to_key[typ] = None
- missing_types.add(key)
+ # we mark the type as missing from the cache because
+ # when the cache was populated it might have been done with a
+ # restricted set of state_keys, so the wildcard will not work
+ # and the cache may be incomplete.
+ missing_types = True
else:
if type_to_key.get(typ, object()) is not None:
type_to_key.setdefault(typ, set()).add(state_key)
if key not in state_dict_ids and key not in known_absent:
- missing_types.add(key)
+ missing_types = True
sentinel = object()
def include(typ, state_key):
valid_state_keys = type_to_key.get(typ, sentinel)
if valid_state_keys is sentinel:
- return False
+ return filtered_types is not None and typ not in filtered_types
if valid_state_keys is None:
return True
if state_key in valid_state_keys:
return True
return False
- got_all = is_all or not missing_types
+ got_all = is_all
+ if not got_all:
+ # the cache is incomplete. We may still have got all the results we need, if
+ # we don't have any wildcards in the match list.
+ if not missing_types and filtered_types is None:
+ got_all = True
return {
k: v for k, v in iteritems(state_dict_ids)
if include(k[0], k[1])
- }, missing_types, got_all
+ }, got_all
def _get_all_state_from_cache(self, group):
"""Checks if group is in cache. See `_get_state_for_groups`
@@ -526,7 +573,7 @@ class StateGroupWorkerStore(SQLBaseStore):
return state_dict_ids, is_all
@defer.inlineCallbacks
- def _get_state_for_groups(self, groups, types=None):
+ def _get_state_for_groups(self, groups, types=None, filtered_types=None):
"""Gets the state at each of a list of state groups, optionally
filtering by type/state_key
@@ -540,6 +587,9 @@ class StateGroupWorkerStore(SQLBaseStore):
Otherwise, each entry should be a `(type, state_key)` tuple to
include in the response. A `state_key` of None is a wildcard
meaning that we require all state with that type.
+ filtered_types(list[str]|None): Only apply filtering via `types` to this
+ list of event types. Other types of events are returned unfiltered.
+ If None, `types` filtering is applied to all events.
Returns:
Deferred[dict[int, dict[(type, state_key), EventBase]]]
@@ -551,8 +601,8 @@ class StateGroupWorkerStore(SQLBaseStore):
missing_groups = []
if types is not None:
for group in set(groups):
- state_dict_ids, _, got_all = self._get_some_state_from_cache(
- group, types,
+ state_dict_ids, got_all = self._get_some_state_from_cache(
+ group, types, filtered_types
)
results[group] = state_dict_ids
@@ -579,13 +629,13 @@ class StateGroupWorkerStore(SQLBaseStore):
# cache. Hence, if we are doing a wildcard lookup, populate the
# cache fully so that we can do an efficient lookup next time.
- if types and any(k is None for (t, k) in types):
+ if filtered_types or (types and any(k is None for (t, k) in types)):
types_to_fetch = None
else:
types_to_fetch = types
group_to_state_dict = yield self._get_state_groups_from_groups(
- missing_groups, types_to_fetch,
+ missing_groups, types_to_fetch
)
for group, group_state_dict in iteritems(group_to_state_dict):
@@ -595,7 +645,10 @@ class StateGroupWorkerStore(SQLBaseStore):
if types:
for k, v in iteritems(group_state_dict):
(typ, _) = k
- if k in types or (typ, None) in types:
+ if (
+ (k in types or (typ, None) in types) or
+ (filtered_types and typ not in filtered_types)
+ ):
state_dict[k] = v
else:
state_dict.update(group_state_dict)
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index c3bc94f56d..428e7fa36e 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -22,6 +22,7 @@ from canonicaljson import encode_canonical_json, json
from twisted.internet import defer
+from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.caches.descriptors import cached
from ._base import SQLBaseStore
@@ -57,7 +58,7 @@ class TransactionStore(SQLBaseStore):
def __init__(self, db_conn, hs):
super(TransactionStore, self).__init__(db_conn, hs)
- self._clock.looping_call(self._cleanup_transactions, 30 * 60 * 1000)
+ self._clock.looping_call(self._start_cleanup_transactions, 30 * 60 * 1000)
def get_received_txn_response(self, transaction_id, origin):
"""For an incoming transaction from a given origin, check if we have
@@ -271,6 +272,11 @@ class TransactionStore(SQLBaseStore):
txn.execute(query, (self._clock.time_msec(),))
return self.cursor_to_dict(txn)
+ def _start_cleanup_transactions(self):
+ return run_as_background_process(
+ "cleanup_transactions", self._cleanup_transactions,
+ )
+
def _cleanup_transactions(self):
now = self._clock.time_msec()
month_ago = now - 30 * 24 * 60 * 60 * 1000
diff --git a/synapse/util/async.py b/synapse/util/async.py
index 5d0fb39130..a7094e2fb4 100644
--- a/synapse/util/async.py
+++ b/synapse/util/async.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -12,7 +13,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
+import collections
import logging
from contextlib import contextmanager
@@ -156,54 +157,72 @@ def concurrently_execute(func, args, limit):
class Linearizer(object):
- """Linearizes access to resources based on a key. Useful to ensure only one
- thing is happening at a time on a given resource.
+ """Limits concurrent access to resources based on a key. Useful to ensure
+ only a few things happen at a time on a given resource.
Example:
- with (yield linearizer.queue("test_key")):
+ with (yield limiter.queue("test_key")):
# do some work.
"""
- def __init__(self, name=None, clock=None):
+ def __init__(self, name=None, max_count=1, clock=None):
+ """
+ Args:
+ max_count(int): The maximum number of concurrent accesses
+ """
if name is None:
self.name = id(self)
else:
self.name = name
- self.key_to_defer = {}
if not clock:
from twisted.internet import reactor
clock = Clock(reactor)
self._clock = clock
+ self.max_count = max_count
+
+ # key_to_defer is a map from the key to a 2 element list where
+ # the first element is the number of things executing, and
+ # the second element is an OrderedDict, where the keys are deferreds for the
+ # things blocked from executing.
+ self.key_to_defer = {}
@defer.inlineCallbacks
def queue(self, key):
- # If there is already a deferred in the queue, we pull it out so that
- # we can wait on it later.
- # Then we replace it with a deferred that we resolve *after* the
- # context manager has exited.
- # We only return the context manager after the previous deferred has
- # resolved.
- # This all has the net effect of creating a chain of deferreds that
- # wait for the previous deferred before starting their work.
- current_defer = self.key_to_defer.get(key)
+ entry = self.key_to_defer.setdefault(key, [0, collections.OrderedDict()])
- new_defer = defer.Deferred()
- self.key_to_defer[key] = new_defer
+ # If the number of things executing is greater than the maximum
+ # then add a deferred to the list of blocked items
+ # When on of the things currently executing finishes it will callback
+ # this item so that it can continue executing.
+ if entry[0] >= self.max_count:
+ new_defer = defer.Deferred()
+ entry[1][new_defer] = 1
- if current_defer:
logger.info(
- "Waiting to acquire linearizer lock %r for key %r", self.name, key
+ "Waiting to acquire linearizer lock %r for key %r", self.name, key,
)
try:
- with PreserveLoggingContext():
- yield current_defer
- except Exception:
- logger.exception("Unexpected exception in Linearizer")
-
- logger.info("Acquired linearizer lock %r for key %r", self.name,
- key)
+ yield make_deferred_yieldable(new_defer)
+ except Exception as e:
+ if isinstance(e, CancelledError):
+ logger.info(
+ "Cancelling wait for linearizer lock %r for key %r",
+ self.name, key,
+ )
+ else:
+ logger.warn(
+ "Unexpected exception waiting for linearizer lock %r for key %r",
+ self.name, key,
+ )
+
+ # we just have to take ourselves back out of the queue.
+ del entry[1][new_defer]
+ raise
+
+ logger.info("Acquired linearizer lock %r for key %r", self.name, key)
+ entry[0] += 1
# if the code holding the lock completes synchronously, then it
# will recursively run the next claimant on the list. That can
@@ -213,15 +232,15 @@ class Linearizer(object):
# In order to break the cycle, we add a cheeky sleep(0) here to
# ensure that we fall back to the reactor between each iteration.
#
- # (There's no particular need for it to happen before we return
- # the context manager, but it needs to happen while we hold the
- # lock, and the context manager's exit code must be synchronous,
- # so actually this is the only sensible place.
+ # (This needs to happen while we hold the lock, and the context manager's exit
+ # code must be synchronous, so this is the only sensible place.)
yield self._clock.sleep(0)
else:
- logger.info("Acquired uncontended linearizer lock %r for key %r",
- self.name, key)
+ logger.info(
+ "Acquired uncontended linearizer lock %r for key %r", self.name, key,
+ )
+ entry[0] += 1
@contextmanager
def _ctx_manager():
@@ -229,73 +248,15 @@ class Linearizer(object):
yield
finally:
logger.info("Releasing linearizer lock %r for key %r", self.name, key)
- with PreserveLoggingContext():
- new_defer.callback(None)
- current_d = self.key_to_defer.get(key)
- if current_d is new_defer:
- self.key_to_defer.pop(key, None)
-
- defer.returnValue(_ctx_manager())
-
-
-class Limiter(object):
- """Limits concurrent access to resources based on a key. Useful to ensure
- only a few thing happen at a time on a given resource.
-
- Example:
-
- with (yield limiter.queue("test_key")):
- # do some work.
-
- """
- def __init__(self, max_count):
- """
- Args:
- max_count(int): The maximum number of concurrent access
- """
- self.max_count = max_count
-
- # key_to_defer is a map from the key to a 2 element list where
- # the first element is the number of things executing
- # the second element is a list of deferreds for the things blocked from
- # executing.
- self.key_to_defer = {}
-
- @defer.inlineCallbacks
- def queue(self, key):
- entry = self.key_to_defer.setdefault(key, [0, []])
-
- # If the number of things executing is greater than the maximum
- # then add a deferred to the list of blocked items
- # When on of the things currently executing finishes it will callback
- # this item so that it can continue executing.
- if entry[0] >= self.max_count:
- new_defer = defer.Deferred()
- entry[1].append(new_defer)
-
- logger.info("Waiting to acquire limiter lock for key %r", key)
- with PreserveLoggingContext():
- yield new_defer
- logger.info("Acquired limiter lock for key %r", key)
- else:
- logger.info("Acquired uncontended limiter lock for key %r", key)
-
- entry[0] += 1
-
- @contextmanager
- def _ctx_manager():
- try:
- yield
- finally:
- logger.info("Releasing limiter lock for key %r", key)
# We've finished executing so check if there are any things
# blocked waiting to execute and start one of them
entry[0] -= 1
if entry[1]:
- next_def = entry[1].pop(0)
+ (next_def, _) = entry[1].popitem(last=False)
+ # we need to run the next thing in the sentinel context.
with PreserveLoggingContext():
next_def.callback(None)
elif entry[0] == 0:
diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py
index 4abca91f6d..ce85b2ae11 100644
--- a/synapse/util/caches/expiringcache.py
+++ b/synapse/util/caches/expiringcache.py
@@ -16,6 +16,7 @@
import logging
from collections import OrderedDict
+from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.caches import register_cache
logger = logging.getLogger(__name__)
@@ -63,7 +64,10 @@ class ExpiringCache(object):
return
def f():
- self._prune_cache()
+ return run_as_background_process(
+ "prune_cache_%s" % self._cache_name,
+ self._prune_cache,
+ )
self._clock.looping_call(f, self._expiry_ms / 2)
diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py
index a1f8ff8f10..f2bde74dc5 100644
--- a/synapse/util/caches/stream_change_cache.py
+++ b/synapse/util/caches/stream_change_cache.py
@@ -80,12 +80,7 @@ class StreamChangeCache(object):
)
}
- # we need to include entities which we don't know about, as well as
- # those which are known to have changed since the stream pos.
- result = {
- e for e in entities
- if e in changed_entities or e not in self._entity_to_key
- }
+ result = changed_entities.intersection(entities)
self.metrics.inc_hits()
else:
diff --git a/synapse/util/distributor.py b/synapse/util/distributor.py
index 734331caaa..194da87639 100644
--- a/synapse/util/distributor.py
+++ b/synapse/util/distributor.py
@@ -17,20 +17,18 @@ import logging
from twisted.internet import defer
-from synapse.util import unwrapFirstError
-from synapse.util.logcontext import PreserveLoggingContext
+from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.util.logcontext import make_deferred_yieldable, run_in_background
logger = logging.getLogger(__name__)
def user_left_room(distributor, user, room_id):
- with PreserveLoggingContext():
- distributor.fire("user_left_room", user=user, room_id=room_id)
+ distributor.fire("user_left_room", user=user, room_id=room_id)
def user_joined_room(distributor, user, room_id):
- with PreserveLoggingContext():
- distributor.fire("user_joined_room", user=user, room_id=room_id)
+ distributor.fire("user_joined_room", user=user, room_id=room_id)
class Distributor(object):
@@ -44,9 +42,7 @@ class Distributor(object):
model will do for today.
"""
- def __init__(self, suppress_failures=True):
- self.suppress_failures = suppress_failures
-
+ def __init__(self):
self.signals = {}
self.pre_registration = {}
@@ -56,7 +52,6 @@ class Distributor(object):
self.signals[name] = Signal(
name,
- suppress_failures=self.suppress_failures,
)
if name in self.pre_registration:
@@ -75,10 +70,18 @@ class Distributor(object):
self.pre_registration[name].append(observer)
def fire(self, name, *args, **kwargs):
+ """Dispatches the given signal to the registered observers.
+
+ Runs the observers as a background process. Does not return a deferred.
+ """
if name not in self.signals:
raise KeyError("%r does not have a signal named %s" % (self, name))
- return self.signals[name].fire(*args, **kwargs)
+ run_as_background_process(
+ name,
+ self.signals[name].fire,
+ *args, **kwargs
+ )
class Signal(object):
@@ -91,9 +94,8 @@ class Signal(object):
method into all of the observers.
"""
- def __init__(self, name, suppress_failures):
+ def __init__(self, name):
self.name = name
- self.suppress_failures = suppress_failures
self.observers = []
def observe(self, observer):
@@ -103,7 +105,6 @@ class Signal(object):
Each observer callable may return a Deferred."""
self.observers.append(observer)
- @defer.inlineCallbacks
def fire(self, *args, **kwargs):
"""Invokes every callable in the observer list, passing in the args and
kwargs. Exceptions thrown by observers are logged but ignored. It is
@@ -121,22 +122,17 @@ class Signal(object):
failure.type,
failure.value,
failure.getTracebackObject()))
- if not self.suppress_failures:
- return failure
return defer.maybeDeferred(observer, *args, **kwargs).addErrback(eb)
- with PreserveLoggingContext():
- deferreds = [
- do(observer)
- for observer in self.observers
- ]
-
- res = yield defer.gatherResults(
- deferreds, consumeErrors=True
- ).addErrback(unwrapFirstError)
+ deferreds = [
+ run_in_background(do, o)
+ for o in self.observers
+ ]
- defer.returnValue(res)
+ return make_deferred_yieldable(defer.gatherResults(
+ deferreds, consumeErrors=True,
+ ))
def __repr__(self):
return "<Signal name=%r>" % (self.name,)
diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py
index f6c7175f74..8dcae50b39 100644
--- a/synapse/util/logcontext.py
+++ b/synapse/util/logcontext.py
@@ -99,6 +99,17 @@ class ContextResourceUsage(object):
self.db_sched_duration_sec = 0
self.evt_db_fetch_count = 0
+ def __repr__(self):
+ return ("<ContextResourceUsage ru_stime='%r', ru_utime='%r', "
+ "db_txn_count='%r', db_txn_duration_sec='%r', "
+ "db_sched_duration_sec='%r', evt_db_fetch_count='%r'>") % (
+ self.ru_stime,
+ self.ru_utime,
+ self.db_txn_count,
+ self.db_txn_duration_sec,
+ self.db_sched_duration_sec,
+ self.evt_db_fetch_count,)
+
def __iadd__(self, other):
"""Add another ContextResourceUsage's stats to this one's.
diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py
index 6ba7107896..97f1267380 100644
--- a/synapse/util/metrics.py
+++ b/synapse/util/metrics.py
@@ -104,12 +104,19 @@ class Measure(object):
logger.warn("Expected context. (%r)", self.name)
return
- usage = context.get_resource_usage() - self.start_usage
- block_ru_utime.labels(self.name).inc(usage.ru_utime)
- block_ru_stime.labels(self.name).inc(usage.ru_stime)
- block_db_txn_count.labels(self.name).inc(usage.db_txn_count)
- block_db_txn_duration.labels(self.name).inc(usage.db_txn_duration_sec)
- block_db_sched_duration.labels(self.name).inc(usage.db_sched_duration_sec)
+ current = context.get_resource_usage()
+ usage = current - self.start_usage
+ try:
+ block_ru_utime.labels(self.name).inc(usage.ru_utime)
+ block_ru_stime.labels(self.name).inc(usage.ru_stime)
+ block_db_txn_count.labels(self.name).inc(usage.db_txn_count)
+ block_db_txn_duration.labels(self.name).inc(usage.db_txn_duration_sec)
+ block_db_sched_duration.labels(self.name).inc(usage.db_sched_duration_sec)
+ except ValueError:
+ logger.warn(
+ "Failed to save metrics! OLD: %r, NEW: %r",
+ self.start_usage, current
+ )
if self.created_context:
self.start_context.__exit__(exc_type, exc_val, exc_tb)
diff --git a/synapse/visibility.py b/synapse/visibility.py
index 015c2bab37..d4680863d3 100644
--- a/synapse/visibility.py
+++ b/synapse/visibility.py
@@ -12,15 +12,18 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-import itertools
+
import logging
import operator
+from six import iteritems, itervalues
+from six.moves import map
+
from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
from synapse.events.utils import prune_event
-from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
+from synapse.types import get_domain_from_id
logger = logging.getLogger(__name__)
@@ -72,19 +75,6 @@ def filter_events_for_client(store, user_id, events, is_peeking=False,
types=types,
)
- forgotten = yield make_deferred_yieldable(defer.gatherResults([
- defer.maybeDeferred(
- preserve_fn(store.who_forgot_in_room),
- room_id,
- )
- for room_id in frozenset(e.room_id for e in events)
- ], consumeErrors=True))
-
- # Set of membership event_ids that have been forgotten
- event_id_forgotten = frozenset(
- row["event_id"] for rows in forgotten for row in rows
- )
-
ignore_dict_content = yield store.get_global_account_data_by_type_for_user(
"m.ignored_user_list", user_id,
)
@@ -173,10 +163,7 @@ def filter_events_for_client(store, user_id, events, is_peeking=False,
if membership is None:
membership_event = state.get((EventTypes.Member, user_id), None)
if membership_event:
- # XXX why do we do this?
- # https://github.com/matrix-org/synapse/issues/3350
- if membership_event.event_id not in event_id_forgotten:
- membership = membership_event.membership
+ membership = membership_event.membership
# if the user was a member of the room at the time of the event,
# they can see it.
@@ -218,10 +205,161 @@ def filter_events_for_client(store, user_id, events, is_peeking=False,
return event
# check each event: gives an iterable[None|EventBase]
- filtered_events = itertools.imap(allowed, events)
+ filtered_events = map(allowed, events)
# remove the None entries
filtered_events = filter(operator.truth, filtered_events)
# we turn it into a list before returning it.
defer.returnValue(list(filtered_events))
+
+
+@defer.inlineCallbacks
+def filter_events_for_server(store, server_name, events):
+ # Whatever else we do, we need to check for senders which have requested
+ # erasure of their data.
+ erased_senders = yield store.are_users_erased(
+ e.sender for e in events,
+ )
+
+ def redact_disallowed(event, state):
+ # if the sender has been gdpr17ed, always return a redacted
+ # copy of the event.
+ if erased_senders[event.sender]:
+ logger.info(
+ "Sender of %s has been erased, redacting",
+ event.event_id,
+ )
+ return prune_event(event)
+
+ # state will be None if we decided we didn't need to filter by
+ # room membership.
+ if not state:
+ return event
+
+ history = state.get((EventTypes.RoomHistoryVisibility, ''), None)
+ if history:
+ visibility = history.content.get("history_visibility", "shared")
+ if visibility in ["invited", "joined"]:
+ # We now loop through all state events looking for
+ # membership states for the requesting server to determine
+ # if the server is either in the room or has been invited
+ # into the room.
+ for ev in itervalues(state):
+ if ev.type != EventTypes.Member:
+ continue
+ try:
+ domain = get_domain_from_id(ev.state_key)
+ except Exception:
+ continue
+
+ if domain != server_name:
+ continue
+
+ memtype = ev.membership
+ if memtype == Membership.JOIN:
+ return event
+ elif memtype == Membership.INVITE:
+ if visibility == "invited":
+ return event
+ else:
+ # server has no users in the room: redact
+ return prune_event(event)
+
+ return event
+
+ # Next lets check to see if all the events have a history visibility
+ # of "shared" or "world_readable". If thats the case then we don't
+ # need to check membership (as we know the server is in the room).
+ event_to_state_ids = yield store.get_state_ids_for_events(
+ frozenset(e.event_id for e in events),
+ types=(
+ (EventTypes.RoomHistoryVisibility, ""),
+ )
+ )
+
+ visibility_ids = set()
+ for sids in itervalues(event_to_state_ids):
+ hist = sids.get((EventTypes.RoomHistoryVisibility, ""))
+ if hist:
+ visibility_ids.add(hist)
+
+ # If we failed to find any history visibility events then the default
+ # is "shared" visiblity.
+ if not visibility_ids:
+ all_open = True
+ else:
+ event_map = yield store.get_events(visibility_ids)
+ all_open = all(
+ e.content.get("history_visibility") in (None, "shared", "world_readable")
+ for e in itervalues(event_map)
+ )
+
+ if all_open:
+ # all the history_visibility state affecting these events is open, so
+ # we don't need to filter by membership state. We *do* need to check
+ # for user erasure, though.
+ if erased_senders:
+ events = [
+ redact_disallowed(e, None)
+ for e in events
+ ]
+
+ defer.returnValue(events)
+
+ # Ok, so we're dealing with events that have non-trivial visibility
+ # rules, so we need to also get the memberships of the room.
+
+ # first, for each event we're wanting to return, get the event_ids
+ # of the history vis and membership state at those events.
+ event_to_state_ids = yield store.get_state_ids_for_events(
+ frozenset(e.event_id for e in events),
+ types=(
+ (EventTypes.RoomHistoryVisibility, ""),
+ (EventTypes.Member, None),
+ )
+ )
+
+ # We only want to pull out member events that correspond to the
+ # server's domain.
+ #
+ # event_to_state_ids contains lots of duplicates, so it turns out to be
+ # cheaper to build a complete set of unique
+ # ((type, state_key), event_id) tuples, and then filter out the ones we
+ # don't want.
+ #
+ state_key_to_event_id_set = {
+ e
+ for key_to_eid in itervalues(event_to_state_ids)
+ for e in key_to_eid.items()
+ }
+
+ def include(typ, state_key):
+ if typ != EventTypes.Member:
+ return True
+
+ # we avoid using get_domain_from_id here for efficiency.
+ idx = state_key.find(":")
+ if idx == -1:
+ return False
+ return state_key[idx + 1:] == server_name
+
+ event_map = yield store.get_events([
+ e_id
+ for key, e_id in state_key_to_event_id_set
+ if include(key[0], key[1])
+ ])
+
+ event_to_state = {
+ e_id: {
+ key: event_map[inner_e_id]
+ for key, inner_e_id in iteritems(key_to_eid)
+ if inner_e_id in event_map
+ }
+ for e_id, key_to_eid in iteritems(event_to_state_ids)
+ }
+
+ defer.returnValue([
+ redact_disallowed(e, event_to_state[e.event_id])
+ for e in events
+ ])
|