diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py
index a31a9a17e0..eed8c67e6a 100644
--- a/synapse/api/filtering.py
+++ b/synapse/api/filtering.py
@@ -226,7 +226,7 @@ class Filtering(object):
jsonschema.validate(user_filter_json, USER_FILTER_SCHEMA,
format_checker=FormatChecker())
except jsonschema.ValidationError as e:
- raise SynapseError(400, e.message)
+ raise SynapseError(400, str(e))
class FilterCollection(object):
diff --git a/synapse/api/urls.py b/synapse/api/urls.py
index 71347912f1..6d9f1ca0ef 100644
--- a/synapse/api/urls.py
+++ b/synapse/api/urls.py
@@ -64,7 +64,7 @@ class ConsentURIBuilder(object):
"""
mac = hmac.new(
key=self._hmac_secret,
- msg=user_id,
+ msg=user_id.encode('ascii'),
digestmod=sha256,
).hexdigest()
consent_uri = "%s_matrix/consent?%s" % (
diff --git a/synapse/app/__init__.py b/synapse/app/__init__.py
index 3b6b9368b8..c3afcc573b 100644
--- a/synapse/app/__init__.py
+++ b/synapse/app/__init__.py
@@ -24,7 +24,7 @@ try:
python_dependencies.check_requirements()
except python_dependencies.MissingRequirementError as e:
message = "\n".join([
- "Missing Requirement: %s" % (e.message,),
+ "Missing Requirement: %s" % (str(e),),
"To install run:",
" pip install --upgrade --force \"%s\"" % (e.dependency,),
"",
diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py
index 02039f7e79..8559e141af 100644
--- a/synapse/app/appservice.py
+++ b/synapse/app/appservice.py
@@ -136,7 +136,7 @@ def start(config_options):
"Synapse appservice", config_options
)
except ConfigError as e:
- sys.stderr.write("\n" + e.message + "\n")
+ sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1)
assert config.worker_app == "synapse.app.appservice"
diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py
index 4c73c637bb..76aed8c60a 100644
--- a/synapse/app/client_reader.py
+++ b/synapse/app/client_reader.py
@@ -153,7 +153,7 @@ def start(config_options):
"Synapse client reader", config_options
)
except ConfigError as e:
- sys.stderr.write("\n" + e.message + "\n")
+ sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1)
assert config.worker_app == "synapse.app.client_reader"
diff --git a/synapse/app/event_creator.py b/synapse/app/event_creator.py
index bc82197b2a..9060ab14f6 100644
--- a/synapse/app/event_creator.py
+++ b/synapse/app/event_creator.py
@@ -169,7 +169,7 @@ def start(config_options):
"Synapse event creator", config_options
)
except ConfigError as e:
- sys.stderr.write("\n" + e.message + "\n")
+ sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1)
assert config.worker_app == "synapse.app.event_creator"
diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py
index 18ca71ef99..228a297fb8 100644
--- a/synapse/app/federation_reader.py
+++ b/synapse/app/federation_reader.py
@@ -140,7 +140,7 @@ def start(config_options):
"Synapse federation reader", config_options
)
except ConfigError as e:
- sys.stderr.write("\n" + e.message + "\n")
+ sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1)
assert config.worker_app == "synapse.app.federation_reader"
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index 6501c57792..e9a99d76e1 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -160,7 +160,7 @@ def start(config_options):
"Synapse federation sender", config_options
)
except ConfigError as e:
- sys.stderr.write("\n" + e.message + "\n")
+ sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1)
assert config.worker_app == "synapse.app.federation_sender"
diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py
index b076fbe522..fc4b25de1c 100644
--- a/synapse/app/frontend_proxy.py
+++ b/synapse/app/frontend_proxy.py
@@ -228,7 +228,7 @@ def start(config_options):
"Synapse frontend proxy", config_options
)
except ConfigError as e:
- sys.stderr.write("\n" + e.message + "\n")
+ sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1)
assert config.worker_app == "synapse.app.frontend_proxy"
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 8c5d858b0b..e3f0d99a3f 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -301,7 +301,7 @@ class SynapseHomeServer(HomeServer):
try:
database_engine.check_database(db_conn.cursor())
except IncorrectDatabaseSetup as e:
- quit_with_error(e.message)
+ quit_with_error(str(e))
# Gauges to expose monthly active user control metrics
@@ -328,7 +328,7 @@ def setup(config_options):
config_options,
)
except ConfigError as e:
- sys.stderr.write("\n" + e.message + "\n")
+ sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1)
if not config:
@@ -386,7 +386,6 @@ def setup(config_options):
hs.get_pusherpool().start()
hs.get_datastore().start_profiling()
hs.get_datastore().start_doing_background_updates()
- hs.get_federation_client().start_get_pdu_cache()
reactor.callWhenRunning(start)
diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py
index 992d182dba..acc0487adc 100644
--- a/synapse/app/media_repository.py
+++ b/synapse/app/media_repository.py
@@ -133,7 +133,7 @@ def start(config_options):
"Synapse media repository", config_options
)
except ConfigError as e:
- sys.stderr.write("\n" + e.message + "\n")
+ sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1)
assert config.worker_app == "synapse.app.media_repository"
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index 2ec4c7defb..630dcda478 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -191,7 +191,7 @@ def start(config_options):
"Synapse pusher", config_options
)
except ConfigError as e:
- sys.stderr.write("\n" + e.message + "\n")
+ sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1)
assert config.worker_app == "synapse.app.pusher"
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index df81b7bcbe..9a7fc6ee9d 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -410,7 +410,7 @@ def start(config_options):
"Synapse synchrotron", config_options
)
except ConfigError as e:
- sys.stderr.write("\n" + e.message + "\n")
+ sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1)
assert config.worker_app == "synapse.app.synchrotron"
diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py
index b383e79c1c..0a5f62b509 100644
--- a/synapse/app/user_dir.py
+++ b/synapse/app/user_dir.py
@@ -188,7 +188,7 @@ def start(config_options):
"Synapse user directory", config_options
)
except ConfigError as e:
- sys.stderr.write("\n" + e.message + "\n")
+ sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1)
assert config.worker_app == "synapse.app.user_dir"
diff --git a/synapse/config/__main__.py b/synapse/config/__main__.py
index 58c97a70af..8fccf573ee 100644
--- a/synapse/config/__main__.py
+++ b/synapse/config/__main__.py
@@ -25,7 +25,7 @@ if __name__ == "__main__":
try:
config = HomeServerConfig.load_config("", sys.argv[3:])
except ConfigError as e:
- sys.stderr.write("\n" + e.message + "\n")
+ sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1)
print (getattr(config, key))
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 5a92428f56..d05ed91d64 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -209,8 +209,6 @@ class FederationClient(FederationBase):
Will attempt to get the PDU from each destination in the list until
one succeeds.
- This will persist the PDU locally upon receipt.
-
Args:
destinations (list): Which home servers to query
event_id (str): event to fetch
@@ -289,8 +287,7 @@ class FederationClient(FederationBase):
@defer.inlineCallbacks
@log_function
def get_state_for_room(self, destination, room_id, event_id):
- """Requests all of the `current` state PDUs for a given room from
- a remote home server.
+ """Requests all of the room state at a given event from a remote home server.
Args:
destination (str): The remote homeserver to query for the state.
@@ -298,9 +295,10 @@ class FederationClient(FederationBase):
event_id (str): The id of the event we want the state at.
Returns:
- Deferred: Results in a list of PDUs.
+ Deferred[Tuple[List[EventBase], List[EventBase]]]:
+ A list of events in the state, and a list of events in the auth chain
+ for the given event.
"""
-
try:
# First we try and ask for just the IDs, as thats far quicker if
# we have most of the state and auth_chain already.
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 9a571e4fc7..819e8f7331 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -46,6 +46,7 @@ from synapse.replication.http.federation import (
from synapse.types import get_domain_from_id
from synapse.util.async_helpers import Linearizer, concurrently_execute
from synapse.util.caches.response_cache import ResponseCache
+from synapse.util.logcontext import nested_logging_context
from synapse.util.logutils import log_function
# when processing incoming transactions, we try to handle multiple rooms in
@@ -187,21 +188,22 @@ class FederationServer(FederationBase):
for pdu in pdus_by_room[room_id]:
event_id = pdu.event_id
- try:
- yield self._handle_received_pdu(
- origin, pdu
- )
- pdu_results[event_id] = {}
- except FederationError as e:
- logger.warn("Error handling PDU %s: %s", event_id, e)
- pdu_results[event_id] = {"error": str(e)}
- except Exception as e:
- f = failure.Failure()
- pdu_results[event_id] = {"error": str(e)}
- logger.error(
- "Failed to handle PDU %s: %s",
- event_id, f.getTraceback().rstrip(),
- )
+ with nested_logging_context(event_id):
+ try:
+ yield self._handle_received_pdu(
+ origin, pdu
+ )
+ pdu_results[event_id] = {}
+ except FederationError as e:
+ logger.warn("Error handling PDU %s: %s", event_id, e)
+ pdu_results[event_id] = {"error": str(e)}
+ except Exception as e:
+ f = failure.Failure()
+ pdu_results[event_id] = {"error": str(e)}
+ logger.error(
+ "Failed to handle PDU %s: %s",
+ event_id, f.getTraceback().rstrip(),
+ )
yield concurrently_execute(
process_pdus_for_room, pdus_by_room.keys(),
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 8cbf8c4f7f..98b5950800 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -137,26 +137,6 @@ class TransactionQueue(object):
self._processing_pending_presence = False
- def can_send_to(self, destination):
- """Can we send messages to the given server?
-
- We can't send messages to ourselves. If we are running on localhost
- then we can only federation with other servers running on localhost.
- Otherwise we only federate with servers on a public domain.
-
- Args:
- destination(str): The server we are possibly trying to send to.
- Returns:
- bool: True if we can send to the server.
- """
-
- if destination == self.server_name:
- return False
- if self.server_name.startswith("localhost"):
- return destination.startswith("localhost")
- else:
- return not destination.startswith("localhost")
-
def notify_new_events(self, current_id):
"""This gets called when we have some new events we might want to
send out to other servers.
@@ -279,10 +259,7 @@ class TransactionQueue(object):
self._order += 1
destinations = set(destinations)
- destinations = set(
- dest for dest in destinations if self.can_send_to(dest)
- )
-
+ destinations.discard(self.server_name)
logger.debug("Sending to: %s", str(destinations))
if not destinations:
@@ -358,7 +335,7 @@ class TransactionQueue(object):
for destinations, states in hosts_and_states:
for destination in destinations:
- if not self.can_send_to(destination):
+ if destination == self.server_name:
continue
self.pending_presence_by_dest.setdefault(
@@ -377,7 +354,8 @@ class TransactionQueue(object):
content=content,
)
- if not self.can_send_to(destination):
+ if destination == self.server_name:
+ logger.info("Not sending EDU to ourselves")
return
sent_edus_counter.inc()
@@ -392,10 +370,8 @@ class TransactionQueue(object):
self._attempt_new_transaction(destination)
def send_device_messages(self, destination):
- if destination == self.server_name or destination == "localhost":
- return
-
- if not self.can_send_to(destination):
+ if destination == self.server_name:
+ logger.info("Not sending device update to ourselves")
return
self._attempt_new_transaction(destination)
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index 578e9250fb..9dc46aa15f 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -341,7 +341,7 @@ class E2eKeysHandler(object):
def _exception_to_failure(e):
if isinstance(e, CodeMessageException):
return {
- "status": e.code, "message": e.message,
+ "status": e.code, "message": str(e),
}
if isinstance(e, NotRetryingDestination):
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 2ccdc3bfa7..d05b63673f 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -106,7 +106,7 @@ class FederationHandler(BaseHandler):
self.hs = hs
- self.store = hs.get_datastore()
+ self.store = hs.get_datastore() # type: synapse.storage.DataStore
self.federation_client = hs.get_federation_client()
self.state_handler = hs.get_state_handler()
self.server_name = hs.hostname
@@ -323,14 +323,22 @@ class FederationHandler(BaseHandler):
affected=pdu.event_id,
)
- # Calculate the state of the previous events, and
- # de-conflict them to find the current state.
- state_groups = []
+ # Calculate the state after each of the previous events, and
+ # resolve them to find the correct state at the current event.
auth_chains = set()
+ event_map = {
+ event_id: pdu,
+ }
try:
# Get the state of the events we know about
- ours = yield self.store.get_state_groups(room_id, list(seen))
- state_groups.append(ours)
+ ours = yield self.store.get_state_groups_ids(room_id, seen)
+
+ # state_maps is a list of mappings from (type, state_key) to event_id
+ # type: list[dict[tuple[str, str], str]]
+ state_maps = list(ours.values())
+
+ # we don't need this any more, let's delete it.
+ del ours
# Ask the remote server for the states we don't
# know about
@@ -339,27 +347,65 @@ class FederationHandler(BaseHandler):
"[%s %s] Requesting state at missing prev_event %s",
room_id, event_id, p,
)
- state, got_auth_chain = (
- yield self.federation_client.get_state_for_room(
- origin, room_id, p,
+
+ with logcontext.nested_logging_context(p):
+ # note that if any of the missing prevs share missing state or
+ # auth events, the requests to fetch those events are deduped
+ # by the get_pdu_cache in federation_client.
+ remote_state, got_auth_chain = (
+ yield self.federation_client.get_state_for_room(
+ origin, room_id, p,
+ )
)
- )
- auth_chains.update(got_auth_chain)
- state_group = {(x.type, x.state_key): x.event_id for x in state}
- state_groups.append(state_group)
+
+ # we want the state *after* p; get_state_for_room returns the
+ # state *before* p.
+ remote_event = yield self.federation_client.get_pdu(
+ [origin], p, outlier=True,
+ )
+
+ if remote_event is None:
+ raise Exception(
+ "Unable to get missing prev_event %s" % (p, )
+ )
+
+ if remote_event.is_state():
+ remote_state.append(remote_event)
+
+ # XXX hrm I'm not convinced that duplicate events will compare
+ # for equality, so I'm not sure this does what the author
+ # hoped.
+ auth_chains.update(got_auth_chain)
+
+ remote_state_map = {
+ (x.type, x.state_key): x.event_id for x in remote_state
+ }
+ state_maps.append(remote_state_map)
+
+ for x in remote_state:
+ event_map[x.event_id] = x
# Resolve any conflicting state
+ @defer.inlineCallbacks
def fetch(ev_ids):
- return self.store.get_events(
- ev_ids, get_prev_content=False, check_redacted=False
+ fetched = yield self.store.get_events(
+ ev_ids, get_prev_content=False, check_redacted=False,
)
+ # add any events we fetch here to the `event_map` so that we
+ # can use them to build the state event list below.
+ event_map.update(fetched)
+ defer.returnValue(fetched)
room_version = yield self.store.get_room_version(room_id)
state_map = yield resolve_events_with_factory(
- room_version, state_groups, {event_id: pdu}, fetch
+ room_version, state_maps, event_map, fetch,
)
- state = (yield self.store.get_events(state_map.values())).values()
+ # we need to give _process_received_pdu the actual state events
+ # rather than event ids, so generate that now.
+ state = [
+ event_map[e] for e in six.itervalues(state_map)
+ ]
auth_chain = list(auth_chains)
except Exception:
logger.warn(
@@ -483,20 +529,21 @@ class FederationHandler(BaseHandler):
"[%s %s] Handling received prev_event %s",
room_id, event_id, ev.event_id,
)
- try:
- yield self.on_receive_pdu(
- origin,
- ev,
- sent_to_us_directly=False,
- )
- except FederationError as e:
- if e.code == 403:
- logger.warn(
- "[%s %s] Received prev_event %s failed history check.",
- room_id, event_id, ev.event_id,
+ with logcontext.nested_logging_context(ev.event_id):
+ try:
+ yield self.on_receive_pdu(
+ origin,
+ ev,
+ sent_to_us_directly=False,
)
- else:
- raise
+ except FederationError as e:
+ if e.code == 403:
+ logger.warn(
+ "[%s %s] Received prev_event %s failed history check.",
+ room_id, event_id, ev.event_id,
+ )
+ else:
+ raise
@defer.inlineCallbacks
def _process_received_pdu(self, origin, event, state, auth_chain):
@@ -572,6 +619,10 @@ class FederationHandler(BaseHandler):
})
seen_ids.add(e.event_id)
+ logger.info(
+ "[%s %s] persisting newly-received auth/state events %s",
+ room_id, event_id, [e["event"].event_id for e in event_infos]
+ )
yield self._handle_new_events(origin, event_infos)
try:
@@ -1135,7 +1186,8 @@ class FederationHandler(BaseHandler):
try:
logger.info("Processing queued PDU %s which was received "
"while we were joining %s", p.event_id, p.room_id)
- yield self.on_receive_pdu(origin, p, sent_to_us_directly=True)
+ with logcontext.nested_logging_context(p.event_id):
+ yield self.on_receive_pdu(origin, p, sent_to_us_directly=True)
except Exception as e:
logger.warn(
"Error handling queued PDU %s from %s: %s",
@@ -1581,15 +1633,22 @@ class FederationHandler(BaseHandler):
Notifies about the events where appropriate.
"""
- contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults(
- [
- logcontext.run_in_background(
- self._prep_event,
+
+ @defer.inlineCallbacks
+ def prep(ev_info):
+ event = ev_info["event"]
+ with logcontext.nested_logging_context(suffix=event.event_id):
+ res = yield self._prep_event(
origin,
- ev_info["event"],
+ event,
state=ev_info.get("state"),
auth_events=ev_info.get("auth_events"),
)
+ defer.returnValue(res)
+
+ contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults(
+ [
+ logcontext.run_in_background(prep, ev_info)
for ev_info in event_infos
], consumeErrors=True,
))
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index 75b8b7ce6a..f284d5a385 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -278,7 +278,7 @@ class BaseProfileHandler(BaseHandler):
except Exception as e:
logger.warn(
"Failed to update join event for room %s - %s",
- room_id, str(e.message)
+ room_id, str(e)
)
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 2d2d3d5a0d..c610933dd4 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -20,6 +20,7 @@ from twisted.internet import defer
from synapse.api.errors import AuthError, SynapseError
from synapse.types import UserID, get_domain_from_id
+from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.logcontext import run_in_background
from synapse.util.metrics import Measure
from synapse.util.wheel_timer import WheelTimer
@@ -68,6 +69,11 @@ class TypingHandler(object):
# map room IDs to sets of users currently typing
self._room_typing = {}
+ # caches which room_ids changed at which serials
+ self._typing_stream_change_cache = StreamChangeCache(
+ "TypingStreamChangeCache", self._latest_room_serial,
+ )
+
self.clock.looping_call(
self._handle_timeouts,
5000,
@@ -218,6 +224,7 @@ class TypingHandler(object):
for domain in set(get_domain_from_id(u) for u in users):
if domain != self.server_name:
+ logger.debug("sending typing update to %s", domain)
self.federation.send_edu(
destination=domain,
edu_type="m.typing",
@@ -274,19 +281,29 @@ class TypingHandler(object):
self._latest_room_serial += 1
self._room_serials[member.room_id] = self._latest_room_serial
+ self._typing_stream_change_cache.entity_has_changed(
+ member.room_id, self._latest_room_serial,
+ )
self.notifier.on_new_event(
"typing_key", self._latest_room_serial, rooms=[member.room_id]
)
def get_all_typing_updates(self, last_id, current_id):
- # TODO: Work out a way to do this without scanning the entire state.
if last_id == current_id:
return []
+ changed_rooms = self._typing_stream_change_cache.get_all_entities_changed(
+ last_id,
+ )
+
+ if changed_rooms is None:
+ changed_rooms = self._room_serials
+
rows = []
- for room_id, serial in self._room_serials.items():
- if last_id < serial and serial <= current_id:
+ for room_id in changed_rooms:
+ serial = self._room_serials[room_id]
+ if last_id < serial <= current_id:
typing = self._room_typing[room_id]
rows.append((serial, room_id, list(typing)))
rows.sort()
diff --git a/synapse/notifier.py b/synapse/notifier.py
index f1d92c1395..340b16ce25 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -24,9 +24,10 @@ from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError
from synapse.handlers.presence import format_user_presence_state
from synapse.metrics import LaterGauge
+from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import StreamToken
from synapse.util.async_helpers import ObservableDeferred, timeout_deferred
-from synapse.util.logcontext import PreserveLoggingContext, run_in_background
+from synapse.util.logcontext import PreserveLoggingContext
from synapse.util.logutils import log_function
from synapse.util.metrics import Measure
from synapse.visibility import filter_events_for_client
@@ -248,7 +249,10 @@ class Notifier(object):
def _on_new_room_event(self, event, room_stream_id, extra_users=[]):
"""Notify any user streams that are interested in this room event"""
# poke any interested application service.
- run_in_background(self._notify_app_services, room_stream_id)
+ run_as_background_process(
+ "notify_app_services",
+ self._notify_app_services, room_stream_id,
+ )
if self.federation_sender:
self.federation_sender.notify_new_events(room_stream_id)
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index c779f69fa0..0f339a0320 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -33,31 +33,32 @@ logger = logging.getLogger(__name__)
# [2] https://setuptools.readthedocs.io/en/latest/setuptools.html#declaring-dependencies
REQUIREMENTS = {
"jsonschema>=2.5.1": ["jsonschema>=2.5.1"],
- "frozendict>=0.4": ["frozendict"],
+ "frozendict>=1": ["frozendict"],
"unpaddedbase64>=1.1.0": ["unpaddedbase64>=1.1.0"],
"canonicaljson>=1.1.3": ["canonicaljson>=1.1.3"],
"signedjson>=1.0.0": ["signedjson>=1.0.0"],
"pynacl>=1.2.1": ["nacl>=1.2.1", "nacl.bindings"],
- "service_identity>=1.0.0": ["service_identity>=1.0.0"],
+ "service_identity>=16.0.0": ["service_identity>=16.0.0"],
"Twisted>=17.1.0": ["twisted>=17.1.0"],
"treq>=15.1": ["treq>=15.1"],
# Twisted has required pyopenssl 16.0 since about Twisted 16.6.
"pyopenssl>=16.0.0": ["OpenSSL>=16.0.0"],
- "pyyaml": ["yaml"],
- "pyasn1": ["pyasn1"],
- "daemonize": ["daemonize"],
- "bcrypt": ["bcrypt>=3.1.0"],
- "pillow": ["PIL"],
- "pydenticon": ["pydenticon"],
- "sortedcontainers": ["sortedcontainers"],
- "pysaml2>=3.0.0": ["saml2>=3.0.0"],
- "pymacaroons-pynacl": ["pymacaroons"],
+ "pyyaml>=3.11": ["yaml"],
+ "pyasn1>=0.1.9": ["pyasn1"],
+ "pyasn1-modules>=0.0.7": ["pyasn1_modules"],
+ "daemonize>=2.3.1": ["daemonize"],
+ "bcrypt>=3.1.0": ["bcrypt>=3.1.0"],
+ "pillow>=3.1.2": ["PIL"],
+ "pydenticon>=0.2": ["pydenticon"],
+ "sortedcontainers>=1.4.4": ["sortedcontainers"],
+ "pysaml2>=3.0.0": ["saml2"],
+ "pymacaroons-pynacl>=0.9.3": ["pymacaroons"],
"msgpack-python>=0.3.0": ["msgpack"],
"phonenumbers>=8.2.0": ["phonenumbers"],
- "six": ["six"],
- "prometheus_client": ["prometheus_client"],
+ "six>=1.10": ["six"],
+ "prometheus_client>=0.0.18": ["prometheus_client"],
# we use attr.s(slots), which arrived in 16.0.0
"attrs>=16.0.0": ["attr>=16.0.0"],
diff --git a/synapse/state/v1.py b/synapse/state/v1.py
index c95477d318..7a7157b352 100644
--- a/synapse/state/v1.py
+++ b/synapse/state/v1.py
@@ -65,10 +65,15 @@ def resolve_events_with_factory(state_sets, event_map, state_map_factory):
for event_ids in itervalues(conflicted_state)
for event_id in event_ids
)
+ needed_event_count = len(needed_events)
if event_map is not None:
needed_events -= set(iterkeys(event_map))
- logger.info("Asking for %d conflicted events", len(needed_events))
+ logger.info(
+ "Asking for %d/%d conflicted events",
+ len(needed_events),
+ needed_event_count,
+ )
# dict[str, FrozenEvent]: a map from state event id to event. Only includes
# the state events which are in conflict (and those in event_map)
@@ -85,11 +90,16 @@ def resolve_events_with_factory(state_sets, event_map, state_map_factory):
)
new_needed_events = set(itervalues(auth_events))
+ new_needed_event_count = len(new_needed_events)
new_needed_events -= needed_events
if event_map is not None:
new_needed_events -= set(iterkeys(event_map))
- logger.info("Asking for %d auth events", len(new_needed_events))
+ logger.info(
+ "Asking for %d/%d auth events",
+ len(new_needed_events),
+ new_needed_event_count,
+ )
state_map_new = yield state_map_factory(new_needed_events)
state_map.update(state_map_new)
diff --git a/synapse/storage/monthly_active_users.py b/synapse/storage/monthly_active_users.py
index 59580949f1..0fe8c8e24c 100644
--- a/synapse/storage/monthly_active_users.py
+++ b/synapse/storage/monthly_active_users.py
@@ -172,6 +172,10 @@ class MonthlyActiveUsersStore(SQLBaseStore):
Deferred[bool]: True if a new entry was created, False if an
existing one was updated.
"""
+ # Am consciously deciding to lock the table on the basis that is ought
+ # never be a big table and alternative approaches (batching multiple
+ # upserts into a single txn) introduced a lot of extra complexity.
+ # See https://github.com/matrix-org/synapse/issues/3854 for more
is_insert = yield self._simple_upsert(
desc="upsert_monthly_active_user",
table="monthly_active_users",
@@ -181,7 +185,6 @@ class MonthlyActiveUsersStore(SQLBaseStore):
values={
"timestamp": int(self._clock.time_msec()),
},
- lock=False,
)
if is_insert:
self.user_last_seen_monthly_active.invalidate((user_id,))
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 4b971efdba..3f4cbd61c4 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -255,7 +255,17 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
)
@defer.inlineCallbacks
- def get_state_groups_ids(self, room_id, event_ids):
+ def get_state_groups_ids(self, _room_id, event_ids):
+ """Get the event IDs of all the state for the state groups for the given events
+
+ Args:
+ _room_id (str): id of the room for these events
+ event_ids (iterable[str]): ids of the events
+
+ Returns:
+ Deferred[dict[int, dict[tuple[str, str], str]]]:
+ dict of state_group_id -> (dict of (type, state_key) -> event id)
+ """
if not event_ids:
defer.returnValue({})
@@ -270,7 +280,7 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
@defer.inlineCallbacks
def get_state_ids_for_group(self, state_group):
- """Get the state IDs for the given state group
+ """Get the event IDs of all the state in the given state group
Args:
state_group (int)
@@ -286,7 +296,9 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
def get_state_groups(self, room_id, event_ids):
""" Get the state groups for the given list of event_ids
- The return value is a dict mapping group names to lists of events.
+ Returns:
+ Deferred[dict[int, list[EventBase]]]:
+ dict of state_group_id -> list of state events.
"""
if not event_ids:
defer.returnValue({})
@@ -324,7 +336,9 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
member events (if True), or to exclude member events (if False)
Returns:
- dictionary state_group -> (dict of (type, state_key) -> event id)
+ Returns:
+ Deferred[dict[int, dict[tuple[str, str], str]]]:
+ dict of state_group_id -> (dict of (type, state_key) -> event id)
"""
results = {}
@@ -732,8 +746,8 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
If None, `types` filtering is applied to all events.
Returns:
- Deferred[dict[int, dict[(type, state_key), EventBase]]]
- a dictionary mapping from state group to state dictionary.
+ Deferred[dict[int, dict[tuple[str, str], str]]]:
+ dict of state_group_id -> (dict of (type, state_key) -> event id)
"""
if types is not None:
non_member_types = [t for t in types if t[0] != EventTypes.Member]
@@ -788,8 +802,8 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
If None, `types` filtering is applied to all events.
Returns:
- Deferred[dict[int, dict[(type, state_key), EventBase]]]
- a dictionary mapping from state group to state dictionary.
+ Deferred[dict[int, dict[tuple[str, str], str]]]:
+ dict of state_group_id -> (dict of (type, state_key) -> event id)
"""
if types:
types = frozenset(types)
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index baf0379a68..ab54977a75 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -23,6 +23,7 @@ from canonicaljson import encode_canonical_json
from twisted.internet import defer
from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.util.caches.expiringcache import ExpiringCache
from ._base import SQLBaseStore, db_to_json
@@ -49,6 +50,8 @@ _UpdateTransactionRow = namedtuple(
)
)
+SENTINEL = object()
+
class TransactionStore(SQLBaseStore):
"""A collection of queries for handling PDUs.
@@ -59,6 +62,12 @@ class TransactionStore(SQLBaseStore):
self._clock.looping_call(self._start_cleanup_transactions, 30 * 60 * 1000)
+ self._destination_retry_cache = ExpiringCache(
+ cache_name="get_destination_retry_timings",
+ clock=self._clock,
+ expiry_ms=5 * 60 * 1000,
+ )
+
def get_received_txn_response(self, transaction_id, origin):
"""For an incoming transaction from a given origin, check if we have
already responded to it. If so, return the response code and response
@@ -155,6 +164,7 @@ class TransactionStore(SQLBaseStore):
"""
pass
+ @defer.inlineCallbacks
def get_destination_retry_timings(self, destination):
"""Gets the current retry timings (if any) for a given destination.
@@ -165,10 +175,20 @@ class TransactionStore(SQLBaseStore):
None if not retrying
Otherwise a dict for the retry scheme
"""
- return self.runInteraction(
+
+ result = self._destination_retry_cache.get(destination, SENTINEL)
+ if result is not SENTINEL:
+ defer.returnValue(result)
+
+ result = yield self.runInteraction(
"get_destination_retry_timings",
self._get_destination_retry_timings, destination)
+ # We don't hugely care about race conditions between getting and
+ # invalidating the cache, since we time out fairly quickly anyway.
+ self._destination_retry_cache[destination] = result
+ defer.returnValue(result)
+
def _get_destination_retry_timings(self, txn, destination):
result = self._simple_select_one_txn(
txn,
@@ -196,6 +216,7 @@ class TransactionStore(SQLBaseStore):
retry_interval (int) - how long until next retry in ms
"""
+ self._destination_retry_cache.pop(destination)
return self.runInteraction(
"set_destination_retry_timings",
self._set_destination_retry_timings,
diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py
index 9af4ec4aa8..f369780277 100644
--- a/synapse/util/caches/expiringcache.py
+++ b/synapse/util/caches/expiringcache.py
@@ -16,7 +16,7 @@
import logging
from collections import OrderedDict
-from six import itervalues
+from six import iteritems, itervalues
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.caches import register_cache
@@ -24,6 +24,9 @@ from synapse.util.caches import register_cache
logger = logging.getLogger(__name__)
+SENTINEL = object()
+
+
class ExpiringCache(object):
def __init__(self, cache_name, clock, max_len=0, expiry_ms=0,
reset_expiry_on_get=False, iterable=False):
@@ -95,6 +98,21 @@ class ExpiringCache(object):
return entry.value
+ def pop(self, key, default=SENTINEL):
+ """Removes and returns the value with the given key from the cache.
+
+ If the key isn't in the cache then `default` will be returned if
+ specified, otherwise `KeyError` will get raised.
+
+ Identical functionality to `dict.pop(..)`.
+ """
+
+ value = self._cache.pop(key, default)
+ if value is SENTINEL:
+ raise KeyError(key)
+
+ return value
+
def __contains__(self, key):
return key in self._cache
@@ -122,7 +140,7 @@ class ExpiringCache(object):
keys_to_delete = set()
- for key, cache_entry in self._cache.items():
+ for key, cache_entry in iteritems(self._cache):
if now - cache_entry.time > self._expiry_ms:
keys_to_delete.add(key)
@@ -146,6 +164,8 @@ class ExpiringCache(object):
class _CacheEntry(object):
+ __slots__ = ["time", "value"]
+
def __init__(self, time, value):
self.time = time
self.value = value
diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py
index a0c2d37610..89224b26cc 100644
--- a/synapse/util/logcontext.py
+++ b/synapse/util/logcontext.py
@@ -200,7 +200,7 @@ class LoggingContext(object):
sentinel = Sentinel()
- def __init__(self, name=None, parent_context=None):
+ def __init__(self, name=None, parent_context=None, request=None):
self.previous_context = LoggingContext.current_context()
self.name = name
@@ -218,6 +218,13 @@ class LoggingContext(object):
self.parent_context = parent_context
+ if self.parent_context is not None:
+ self.parent_context.copy_to(self)
+
+ if request is not None:
+ # the request param overrides the request from the parent context
+ self.request = request
+
def __str__(self):
return "%s@%x" % (self.name, id(self))
@@ -256,9 +263,6 @@ class LoggingContext(object):
)
self.alive = True
- if self.parent_context is not None:
- self.parent_context.copy_to(self)
-
return self
def __exit__(self, type, value, traceback):
@@ -439,6 +443,35 @@ class PreserveLoggingContext(object):
)
+def nested_logging_context(suffix, parent_context=None):
+ """Creates a new logging context as a child of another.
+
+ The nested logging context will have a 'request' made up of the parent context's
+ request, plus the given suffix.
+
+ CPU/db usage stats will be added to the parent context's on exit.
+
+ Normal usage looks like:
+
+ with nested_logging_context(suffix):
+ # ... do stuff
+
+ Args:
+ suffix (str): suffix to add to the parent context's 'request'.
+ parent_context (LoggingContext|None): parent context. Will use the current context
+ if None.
+
+ Returns:
+ LoggingContext: new logging context.
+ """
+ if parent_context is None:
+ parent_context = LoggingContext.current_context()
+ return LoggingContext(
+ parent_context=parent_context,
+ request=parent_context.request + "-" + suffix,
+ )
+
+
def preserve_fn(f):
"""Function decorator which wraps the function with run_in_background"""
def g(*args, **kwargs):
|