diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py
index 6450a12890..68a9de17b8 100644
--- a/synapse/appservice/scheduler.py
+++ b/synapse/appservice/scheduler.py
@@ -150,12 +150,12 @@ class _TransactionController(object):
if service_is_up:
sent = yield txn.send(self.as_api)
if sent:
- txn.complete(self.store)
+ yield txn.complete(self.store)
else:
- self._start_recoverer(service)
+ preserve_fn(self._start_recoverer)(service)
except Exception as e:
logger.exception(e)
- self._start_recoverer(service)
+ preserve_fn(self._start_recoverer)(service)
@defer.inlineCallbacks
def on_recovered(self, recoverer):
diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py
index 1735ca9345..d7211ee9b3 100644
--- a/synapse/crypto/keyring.py
+++ b/synapse/crypto/keyring.py
@@ -308,15 +308,15 @@ class Keyring(object):
@defer.inlineCallbacks
def get_keys_from_store(self, server_name_and_key_ids):
- res = yield defer.gatherResults(
+ res = yield preserve_context_over_deferred(defer.gatherResults(
[
- self.store.get_server_verify_keys(
+ preserve_fn(self.store.get_server_verify_keys)(
server_name, key_ids
).addCallback(lambda ks, server: (server, ks), server_name)
for server_name, key_ids in server_name_and_key_ids
],
consumeErrors=True,
- ).addErrback(unwrapFirstError)
+ )).addErrback(unwrapFirstError)
defer.returnValue(dict(res))
@@ -337,13 +337,13 @@ class Keyring(object):
)
defer.returnValue({})
- results = yield defer.gatherResults(
+ results = yield preserve_context_over_deferred(defer.gatherResults(
[
- get_key(p_name, p_keys)
+ preserve_fn(get_key)(p_name, p_keys)
for p_name, p_keys in self.perspective_servers.items()
],
consumeErrors=True,
- ).addErrback(unwrapFirstError)
+ )).addErrback(unwrapFirstError)
union_of_keys = {}
for result in results:
@@ -383,13 +383,13 @@ class Keyring(object):
defer.returnValue(keys)
- results = yield defer.gatherResults(
+ results = yield preserve_context_over_deferred(defer.gatherResults(
[
- get_key(server_name, key_ids)
+ preserve_fn(get_key)(server_name, key_ids)
for server_name, key_ids in server_name_and_key_ids
],
consumeErrors=True,
- ).addErrback(unwrapFirstError)
+ )).addErrback(unwrapFirstError)
merged = {}
for result in results:
@@ -466,9 +466,9 @@ class Keyring(object):
for server_name, response_keys in processed_response.items():
keys.setdefault(server_name, {}).update(response_keys)
- yield defer.gatherResults(
+ yield preserve_context_over_deferred(defer.gatherResults(
[
- self.store_keys(
+ preserve_fn(self.store_keys)(
server_name=server_name,
from_server=perspective_name,
verify_keys=response_keys,
@@ -476,7 +476,7 @@ class Keyring(object):
for server_name, response_keys in keys.items()
],
consumeErrors=True
- ).addErrback(unwrapFirstError)
+ )).addErrback(unwrapFirstError)
defer.returnValue(keys)
@@ -524,7 +524,7 @@ class Keyring(object):
keys.update(response_keys)
- yield defer.gatherResults(
+ yield preserve_context_over_deferred(defer.gatherResults(
[
preserve_fn(self.store_keys)(
server_name=key_server_name,
@@ -534,7 +534,7 @@ class Keyring(object):
for key_server_name, verify_keys in keys.items()
],
consumeErrors=True
- ).addErrback(unwrapFirstError)
+ )).addErrback(unwrapFirstError)
defer.returnValue(keys)
@@ -600,7 +600,7 @@ class Keyring(object):
response_keys.update(verify_keys)
response_keys.update(old_verify_keys)
- yield defer.gatherResults(
+ yield preserve_context_over_deferred(defer.gatherResults(
[
preserve_fn(self.store.store_server_keys_json)(
server_name=server_name,
@@ -613,7 +613,7 @@ class Keyring(object):
for key_id in updated_key_ids
],
consumeErrors=True,
- ).addErrback(unwrapFirstError)
+ )).addErrback(unwrapFirstError)
results[server_name] = response_keys
@@ -702,7 +702,7 @@ class Keyring(object):
A deferred that completes when the keys are stored.
"""
# TODO(markjh): Store whether the keys have expired.
- yield defer.gatherResults(
+ yield preserve_context_over_deferred(defer.gatherResults(
[
preserve_fn(self.store.store_server_verify_key)(
server_name, server_name, key.time_added, key
@@ -710,4 +710,4 @@ class Keyring(object):
for key_id, key in verify_keys.items()
],
consumeErrors=True,
- ).addErrback(unwrapFirstError)
+ )).addErrback(unwrapFirstError)
diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py
index da2f5e8cfd..2339cc9034 100644
--- a/synapse/federation/federation_base.py
+++ b/synapse/federation/federation_base.py
@@ -23,6 +23,7 @@ from synapse.crypto.event_signing import check_event_content_hash
from synapse.api.errors import SynapseError
from synapse.util import unwrapFirstError
+from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
import logging
@@ -102,10 +103,10 @@ class FederationBase(object):
warn, pdu
)
- valid_pdus = yield defer.gatherResults(
+ valid_pdus = yield preserve_context_over_deferred(defer.gatherResults(
deferreds,
consumeErrors=True
- ).addErrback(unwrapFirstError)
+ )).addErrback(unwrapFirstError)
if include_none:
defer.returnValue(valid_pdus)
@@ -129,7 +130,7 @@ class FederationBase(object):
for pdu in pdus
]
- deferreds = self.keyring.verify_json_objects_for_server([
+ deferreds = preserve_fn(self.keyring.verify_json_objects_for_server)([
(p.origin, p.get_pdu_json())
for p in redacted_pdus
])
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 9ba3151713..f2b3aceb49 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -27,6 +27,7 @@ from synapse.util import unwrapFirstError
from synapse.util.async import concurrently_execute
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.logutils import log_function
+from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
from synapse.events import FrozenEvent
import synapse.metrics
@@ -225,10 +226,10 @@ class FederationClient(FederationBase):
]
# FIXME: We should handle signature failures more gracefully.
- pdus[:] = yield defer.gatherResults(
+ pdus[:] = yield preserve_context_over_deferred(defer.gatherResults(
self._check_sigs_and_hashes(pdus),
consumeErrors=True,
- ).addErrback(unwrapFirstError)
+ )).addErrback(unwrapFirstError)
defer.returnValue(pdus)
@@ -457,14 +458,16 @@ class FederationClient(FederationBase):
batch = set(missing_events[i:i + batch_size])
deferreds = [
- self.get_pdu(
+ preserve_fn(self.get_pdu)(
destinations=random_server_list(),
event_id=e_id,
)
for e_id in batch
]
- res = yield defer.DeferredList(deferreds, consumeErrors=True)
+ res = yield preserve_context_over_deferred(
+ defer.DeferredList(deferreds, consumeErrors=True)
+ )
for success, result in res:
if success:
signed_events.append(result)
@@ -853,14 +856,16 @@ class FederationClient(FederationBase):
return srvs
deferreds = [
- self.get_pdu(
+ preserve_fn(self.get_pdu)(
destinations=random_server_list(),
event_id=e_id,
)
for e_id, depth in ordered_missing[:limit - len(signed_events)]
]
- res = yield defer.DeferredList(deferreds, consumeErrors=True)
+ res = yield preserve_context_over_deferred(
+ defer.DeferredList(deferreds, consumeErrors=True)
+ )
for (result, val), (e_id, _) in zip(res, ordered_missing):
if result and val:
signed_events.append(val)
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index dd285452cd..306686a384 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -17,7 +17,7 @@ from twisted.internet import defer
from synapse.api.constants import EventTypes
from synapse.util.metrics import Measure
-from synapse.util.logcontext import preserve_fn
+from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
import logging
@@ -163,10 +163,10 @@ class ApplicationServicesHandler(object):
def query_3pe(self, kind, protocol, fields):
services = yield self._get_services_for_3pn(protocol)
- results = yield defer.DeferredList([
- self.appservice_api.query_3pe(service, kind, protocol, fields)
+ results = yield preserve_context_over_deferred(defer.DeferredList([
+ preserve_fn(self.appservice_api.query_3pe)(service, kind, protocol, fields)
for service in services
- ], consumeErrors=True)
+ ], consumeErrors=True))
ret = []
for (success, result) in results:
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 328f8f4842..01a761715b 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -26,7 +26,9 @@ from synapse.api.errors import (
from synapse.api.constants import EventTypes, Membership, RejectedReason
from synapse.events.validator import EventValidator
from synapse.util import unwrapFirstError
-from synapse.util.logcontext import PreserveLoggingContext, preserve_fn
+from synapse.util.logcontext import (
+ PreserveLoggingContext, preserve_fn, preserve_context_over_deferred
+)
from synapse.util.logutils import log_function
from synapse.util.async import run_on_reactor
from synapse.util.frozenutils import unfreeze
@@ -361,9 +363,9 @@ class FederationHandler(BaseHandler):
missing_auth - failed_to_fetch
)
- results = yield defer.gatherResults(
+ results = yield preserve_context_over_deferred(defer.gatherResults(
[
- self.replication_layer.get_pdu(
+ preserve_fn(self.replication_layer.get_pdu)(
[dest],
event_id,
outlier=True,
@@ -372,10 +374,10 @@ class FederationHandler(BaseHandler):
for event_id in missing_auth - failed_to_fetch
],
consumeErrors=True
- ).addErrback(unwrapFirstError)
- auth_events.update({a.event_id: a for a in results})
+ )).addErrback(unwrapFirstError)
+ auth_events.update({a.event_id: a for a in results if a})
required_auth.update(
- a_id for event in results for a_id, _ in event.auth_events
+ a_id for event in results for a_id, _ in event.auth_events if event
)
missing_auth = required_auth - set(auth_events)
@@ -552,10 +554,10 @@ class FederationHandler(BaseHandler):
event_ids = list(extremities.keys())
- states = yield defer.gatherResults([
- self.state_handler.resolve_state_groups(room_id, [e])
+ states = yield preserve_context_over_deferred(defer.gatherResults([
+ preserve_fn(self.state_handler.resolve_state_groups)(room_id, [e])
for e in event_ids
- ])
+ ]))
states = dict(zip(event_ids, [s[1] for s in states]))
for e_id, _ in sorted_extremeties_tuple:
@@ -1166,9 +1168,9 @@ class FederationHandler(BaseHandler):
a bunch of outliers, but not a chunk of individual events that depend
on each other for state calculations.
"""
- contexts = yield defer.gatherResults(
+ contexts = yield preserve_context_over_deferred(defer.gatherResults(
[
- self._prep_event(
+ preserve_fn(self._prep_event)(
origin,
ev_info["event"],
state=ev_info.get("state"),
@@ -1176,7 +1178,7 @@ class FederationHandler(BaseHandler):
)
for ev_info in event_infos
]
- )
+ ))
yield self.store.persist_events(
[
@@ -1460,9 +1462,9 @@ class FederationHandler(BaseHandler):
# Do auth conflict res.
logger.info("Different auth: %s", different_auth)
- different_events = yield defer.gatherResults(
+ different_events = yield preserve_context_over_deferred(defer.gatherResults(
[
- self.store.get_event(
+ preserve_fn(self.store.get_event)(
d,
allow_none=True,
allow_rejected=False,
@@ -1471,7 +1473,7 @@ class FederationHandler(BaseHandler):
if d in have_events and not have_events[d]
],
consumeErrors=True
- ).addErrback(unwrapFirstError)
+ )).addErrback(unwrapFirstError)
if different_events:
local_view = dict(auth_events)
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index dc76d34a52..4c3cd9d12e 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -28,7 +28,8 @@ from synapse.types import (
from synapse.util import unwrapFirstError
from synapse.util.async import concurrently_execute, run_on_reactor, ReadWriteLock
from synapse.util.caches.snapshot_cache import SnapshotCache
-from synapse.util.logcontext import preserve_fn
+from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
+from synapse.util.metrics import measure_func
from synapse.visibility import filter_events_for_client
from ._base import BaseHandler
@@ -502,15 +503,17 @@ class MessageHandler(BaseHandler):
lambda states: states[event.event_id]
)
- (messages, token), current_state = yield defer.gatherResults(
- [
- self.store.get_recent_events_for_room(
- event.room_id,
- limit=limit,
- end_token=room_end_token,
- ),
- deferred_room_state,
- ]
+ (messages, token), current_state = yield preserve_context_over_deferred(
+ defer.gatherResults(
+ [
+ preserve_fn(self.store.get_recent_events_for_room)(
+ event.room_id,
+ limit=limit,
+ end_token=room_end_token,
+ ),
+ deferred_room_state,
+ ]
+ )
).addErrback(unwrapFirstError)
messages = yield filter_events_for_client(
@@ -719,9 +722,9 @@ class MessageHandler(BaseHandler):
presence, receipts, (messages, token) = yield defer.gatherResults(
[
- get_presence(),
- get_receipts(),
- self.store.get_recent_events_for_room(
+ preserve_fn(get_presence)(),
+ preserve_fn(get_receipts)(),
+ preserve_fn(self.store.get_recent_events_for_room)(
room_id,
limit=limit,
end_token=now_token.room_key,
@@ -755,6 +758,7 @@ class MessageHandler(BaseHandler):
defer.returnValue(ret)
+ @measure_func("_create_new_client_event")
@defer.inlineCallbacks
def _create_new_client_event(self, builder, prev_event_ids=None):
if prev_event_ids:
@@ -806,6 +810,7 @@ class MessageHandler(BaseHandler):
(event, context,)
)
+ @measure_func("handle_new_client_event")
@defer.inlineCallbacks
def handle_new_client_event(
self,
@@ -934,7 +939,7 @@ class MessageHandler(BaseHandler):
@defer.inlineCallbacks
def _notify():
yield run_on_reactor()
- self.notifier.on_new_room_event(
+ yield self.notifier.on_new_room_event(
event, event_stream_id, max_stream_id,
extra_users=extra_users
)
@@ -944,6 +949,6 @@ class MessageHandler(BaseHandler):
# If invite, remove room_state from unsigned before sending.
event.unsigned.pop("invite_room_state", None)
- federation_handler.handle_new_event(
+ preserve_fn(federation_handler.handle_new_event)(
event, destinations=destinations,
)
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 4709112a0c..8b17632fdc 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -59,10 +59,13 @@ class RoomMemberHandler(BaseHandler):
prev_event_ids,
txn_id=None,
ratelimit=True,
+ content=None,
):
+ if content is None:
+ content = {}
msg_handler = self.hs.get_handlers().message_handler
- content = {"membership": membership}
+ content["membership"] = membership
if requester.is_guest:
content["kind"] = "guest"
@@ -140,6 +143,7 @@ class RoomMemberHandler(BaseHandler):
remote_room_hosts=None,
third_party_signed=None,
ratelimit=True,
+ content=None,
):
key = (room_id,)
@@ -153,6 +157,7 @@ class RoomMemberHandler(BaseHandler):
remote_room_hosts=remote_room_hosts,
third_party_signed=third_party_signed,
ratelimit=ratelimit,
+ content=content,
)
defer.returnValue(result)
@@ -168,7 +173,11 @@ class RoomMemberHandler(BaseHandler):
remote_room_hosts=None,
third_party_signed=None,
ratelimit=True,
+ content=None,
):
+ if content is None:
+ content = {}
+
effective_membership_state = action
if action in ["kick", "unban"]:
effective_membership_state = "leave"
@@ -218,7 +227,7 @@ class RoomMemberHandler(BaseHandler):
if inviter and not self.hs.is_mine(inviter):
remote_room_hosts.append(inviter.domain)
- content = {"membership": Membership.JOIN}
+ content["membership"] = Membership.JOIN
profile = self.hs.get_handlers().profile_handler
content["displayname"] = yield profile.get_displayname(target)
@@ -272,6 +281,7 @@ class RoomMemberHandler(BaseHandler):
txn_id=txn_id,
ratelimit=ratelimit,
prev_event_ids=latest_event_ids,
+ content=content,
)
@defer.inlineCallbacks
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 5589296c09..46181984c0 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -16,7 +16,9 @@
from twisted.internet import defer
from synapse.api.errors import SynapseError, AuthError
-from synapse.util.logcontext import PreserveLoggingContext
+from synapse.util.logcontext import (
+ PreserveLoggingContext, preserve_fn, preserve_context_over_deferred,
+)
from synapse.util.metrics import Measure
from synapse.types import UserID
@@ -169,13 +171,13 @@ class TypingHandler(object):
deferreds = []
for domain in domains:
if domain == self.server_name:
- self._push_update_local(
+ preserve_fn(self._push_update_local)(
room_id=room_id,
user_id=user_id,
typing=typing
)
else:
- deferreds.append(self.federation.send_edu(
+ deferreds.append(preserve_fn(self.federation.send_edu)(
destination=domain,
edu_type="m.typing",
content={
@@ -185,7 +187,9 @@ class TypingHandler(object):
},
))
- yield defer.DeferredList(deferreds, consumeErrors=True)
+ yield preserve_context_over_deferred(
+ defer.DeferredList(deferreds, consumeErrors=True)
+ )
@defer.inlineCallbacks
def _recv_edu(self, origin, content):
diff --git a/synapse/notifier.py b/synapse/notifier.py
index c48024096d..b86648f5e4 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -19,7 +19,7 @@ from synapse.api.errors import AuthError
from synapse.util.logutils import log_function
from synapse.util.async import ObservableDeferred
-from synapse.util.logcontext import PreserveLoggingContext
+from synapse.util.logcontext import PreserveLoggingContext, preserve_fn
from synapse.util.metrics import Measure
from synapse.types import StreamToken
from synapse.visibility import filter_events_for_client
@@ -174,6 +174,7 @@ class Notifier(object):
lambda: len(self.user_to_user_stream),
)
+ @preserve_fn
def on_new_room_event(self, event, room_stream_id, max_room_stream_id,
extra_users=[]):
""" Used by handlers to inform the notifier something has happened
@@ -195,6 +196,7 @@ class Notifier(object):
self.notify_replication()
+ @preserve_fn
def _notify_pending_new_room_events(self, max_room_stream_id):
"""Notify for the room events that were queued waiting for a previous
event to be persisted.
@@ -212,6 +214,7 @@ class Notifier(object):
else:
self._on_new_room_event(event, room_stream_id, extra_users)
+ @preserve_fn
def _on_new_room_event(self, event, room_stream_id, extra_users=[]):
"""Notify any user streams that are interested in this room event"""
# poke any interested application service.
@@ -226,6 +229,7 @@ class Notifier(object):
rooms=[event.room_id],
)
+ @preserve_fn
def on_new_event(self, stream_key, new_token, users=[], rooms=[]):
""" Used to inform listeners that something has happend event wise.
@@ -252,6 +256,7 @@ class Notifier(object):
self.notify_replication()
+ @preserve_fn
def on_new_replication_data(self):
"""Used to inform replication listeners that something has happend
without waking up any of the normal user event streams"""
diff --git a/synapse/push/push_tools.py b/synapse/push/push_tools.py
index d555a33e9a..becb8ef1ae 100644
--- a/synapse/push/push_tools.py
+++ b/synapse/push/push_tools.py
@@ -17,14 +17,15 @@ from twisted.internet import defer
from synapse.util.presentable_names import (
calculate_room_name, name_from_member_event
)
+from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
@defer.inlineCallbacks
def get_badge_count(store, user_id):
- invites, joins = yield defer.gatherResults([
- store.get_invited_rooms_for_user(user_id),
- store.get_rooms_for_user(user_id),
- ], consumeErrors=True)
+ invites, joins = yield preserve_context_over_deferred(defer.gatherResults([
+ preserve_fn(store.get_invited_rooms_for_user)(user_id),
+ preserve_fn(store.get_rooms_for_user)(user_id),
+ ], consumeErrors=True))
my_receipts_by_room = yield store.get_receipts_for_user(
user_id, "m.read",
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index 54c0f1b849..3837be523d 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -17,7 +17,7 @@
from twisted.internet import defer
import pusher
-from synapse.util.logcontext import preserve_fn
+from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
from synapse.util.async import run_on_reactor
import logging
@@ -130,10 +130,12 @@ class PusherPool:
if u in self.pushers:
for p in self.pushers[u].values():
deferreds.append(
- p.on_new_notifications(min_stream_id, max_stream_id)
+ preserve_fn(p.on_new_notifications)(
+ min_stream_id, max_stream_id
+ )
)
- yield defer.gatherResults(deferreds)
+ yield preserve_context_over_deferred(defer.gatherResults(deferreds))
except:
logger.exception("Exception in pusher on_new_notifications")
@@ -155,10 +157,10 @@ class PusherPool:
if u in self.pushers:
for p in self.pushers[u].values():
deferreds.append(
- p.on_new_receipts(min_stream_id, max_stream_id)
+ preserve_fn(p.on_new_receipts)(min_stream_id, max_stream_id)
)
- yield defer.gatherResults(deferreds)
+ yield preserve_context_over_deferred(defer.gatherResults(deferreds))
except:
logger.exception("Exception in pusher on_new_receipts")
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index 89c3895118..0d81757010 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -268,6 +268,7 @@ class JoinRoomAliasServlet(ClientV1RestServlet):
action="join",
txn_id=txn_id,
remote_room_hosts=remote_room_hosts,
+ content=content,
third_party_signed=content.get("third_party_signed", None),
)
diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py
index 943f5676a3..2121bd75ea 100644
--- a/synapse/rest/client/v2_alpha/register.py
+++ b/synapse/rest/client/v2_alpha/register.py
@@ -403,10 +403,9 @@ class RegisterRestServlet(RestServlet):
# register the user's device
device_id = params.get("device_id")
initial_display_name = params.get("initial_device_display_name")
- device_id = self.device_handler.check_device_registered(
+ return self.device_handler.check_device_registered(
user_id, device_id, initial_display_name
)
- return device_id
@defer.inlineCallbacks
def _do_guest_registration(self):
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 97aef25321..57e5005285 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -20,7 +20,9 @@ from synapse.events import FrozenEvent, USE_FROZEN_DICTS
from synapse.events.utils import prune_event
from synapse.util.async import ObservableDeferred
-from synapse.util.logcontext import preserve_fn, PreserveLoggingContext
+from synapse.util.logcontext import (
+ preserve_fn, PreserveLoggingContext, preserve_context_over_deferred
+)
from synapse.util.logutils import log_function
from synapse.util.metrics import Measure
from synapse.api.constants import EventTypes
@@ -202,7 +204,7 @@ class EventsStore(SQLBaseStore):
deferreds = []
for room_id, evs_ctxs in partitioned.items():
- d = self._event_persist_queue.add_to_queue(
+ d = preserve_fn(self._event_persist_queue.add_to_queue)(
room_id, evs_ctxs,
backfilled=backfilled,
current_state=None,
@@ -212,7 +214,9 @@ class EventsStore(SQLBaseStore):
for room_id in partitioned.keys():
self._maybe_start_persisting(room_id)
- return defer.gatherResults(deferreds, consumeErrors=True)
+ return preserve_context_over_deferred(
+ defer.gatherResults(deferreds, consumeErrors=True)
+ )
@defer.inlineCallbacks
@log_function
@@ -225,7 +229,7 @@ class EventsStore(SQLBaseStore):
self._maybe_start_persisting(event.room_id)
- yield deferred
+ yield preserve_context_over_deferred(deferred)
max_persisted_id = yield self._stream_id_gen.get_current_token()
defer.returnValue((event.internal_metadata.stream_ordering, max_persisted_id))
@@ -1088,7 +1092,7 @@ class EventsStore(SQLBaseStore):
if not allow_rejected:
rows[:] = [r for r in rows if not r["rejects"]]
- res = yield defer.gatherResults(
+ res = yield preserve_context_over_deferred(defer.gatherResults(
[
preserve_fn(self._get_event_from_row)(
row["internal_metadata"], row["json"], row["redacts"],
@@ -1097,7 +1101,7 @@ class EventsStore(SQLBaseStore):
for row in rows
],
consumeErrors=True
- )
+ ))
defer.returnValue({
e.event.event_id: e
diff --git a/synapse/storage/schema/delta/34/received_txn_purge.py b/synapse/storage/schema/delta/34/received_txn_purge.py
new file mode 100644
index 0000000000..033144341c
--- /dev/null
+++ b/synapse/storage/schema/delta/34/received_txn_purge.py
@@ -0,0 +1,32 @@
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from synapse.storage.engines import PostgresEngine
+
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+def run_create(cur, database_engine, *args, **kwargs):
+ if isinstance(database_engine, PostgresEngine):
+ cur.execute("TRUNCATE received_transactions")
+ else:
+ cur.execute("DELETE FROM received_transactions")
+
+ cur.execute("CREATE INDEX received_transactions_ts ON received_transactions(ts)")
+
+
+def run_upgrade(cur, database_engine, *args, **kwargs):
+ pass
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 862c5c3ea1..0577a0525b 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -39,7 +39,7 @@ from ._base import SQLBaseStore
from synapse.util.caches.descriptors import cached
from synapse.api.constants import EventTypes
from synapse.types import RoomStreamToken
-from synapse.util.logcontext import preserve_fn
+from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
import logging
@@ -234,12 +234,12 @@ class StreamStore(SQLBaseStore):
results = {}
room_ids = list(room_ids)
for rm_ids in (room_ids[i:i + 20] for i in xrange(0, len(room_ids), 20)):
- res = yield defer.gatherResults([
+ res = yield preserve_context_over_deferred(defer.gatherResults([
preserve_fn(self.get_room_events_stream_for_room)(
room_id, from_key, to_key, limit, order=order,
)
for room_id in rm_ids
- ])
+ ]))
results.update(dict(zip(rm_ids, res)))
defer.returnValue(results)
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index 6258ff1725..58d4de4f1d 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -62,10 +62,9 @@ class TransactionStore(SQLBaseStore):
self.last_transaction = {}
reactor.addSystemEventTrigger("before", "shutdown", self._persist_in_mem_txns)
- hs.get_clock().looping_call(
- self._persist_in_mem_txns,
- 1000,
- )
+ self._clock.looping_call(self._persist_in_mem_txns, 1000)
+
+ self._clock.looping_call(self._cleanup_transactions, 30 * 60 * 1000)
def get_received_txn_response(self, transaction_id, origin):
"""For an incoming transaction from a given origin, check if we have
@@ -127,6 +126,7 @@ class TransactionStore(SQLBaseStore):
"origin": origin,
"response_code": code,
"response_json": buffer(encode_canonical_json(response_dict)),
+ "ts": self._clock.time_msec(),
},
or_ignore=True,
desc="set_received_txn_response",
@@ -383,3 +383,12 @@ class TransactionStore(SQLBaseStore):
yield self.runInteraction("_persist_in_mem_txns", f)
except:
logger.exception("Failed to persist transactions!")
+
+ def _cleanup_transactions(self):
+ now = self._clock.time_msec()
+ month_ago = now - 30 * 24 * 60 * 60 * 1000
+
+ def _cleanup_transactions_txn(txn):
+ txn.execute("DELETE FROM received_transactions WHERE ts < ?", (month_ago,))
+
+ return self.runInteraction("_persist_in_mem_txns", _cleanup_transactions_txn)
diff --git a/synapse/util/async.py b/synapse/util/async.py
index c84b23ff46..347fb1e380 100644
--- a/synapse/util/async.py
+++ b/synapse/util/async.py
@@ -146,10 +146,10 @@ def concurrently_execute(func, args, limit):
except StopIteration:
pass
- return defer.gatherResults([
+ return preserve_context_over_deferred(defer.gatherResults([
preserve_fn(_concurrently_execute_inner)()
for _ in xrange(limit)
- ], consumeErrors=True).addErrback(unwrapFirstError)
+ ], consumeErrors=True)).addErrback(unwrapFirstError)
class Linearizer(object):
@@ -181,7 +181,8 @@ class Linearizer(object):
self.key_to_defer[key] = new_defer
if current_defer:
- yield preserve_context_over_deferred(current_defer)
+ with PreserveLoggingContext():
+ yield current_defer
@contextmanager
def _ctx_manager():
@@ -264,7 +265,7 @@ class ReadWriteLock(object):
curr_readers.clear()
self.key_to_current_writer[key] = new_defer
- yield defer.gatherResults(to_wait_on)
+ yield preserve_context_over_deferred(defer.gatherResults(to_wait_on))
@contextmanager
def _ctx_manager():
diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py
index 7a87045f87..6c83eb213d 100644
--- a/synapse/util/logcontext.py
+++ b/synapse/util/logcontext.py
@@ -297,12 +297,13 @@ def preserve_context_over_fn(fn, *args, **kwargs):
return res
-def preserve_context_over_deferred(deferred):
+def preserve_context_over_deferred(deferred, context=None):
"""Given a deferred wrap it such that any callbacks added later to it will
be invoked with the current context.
"""
- current_context = LoggingContext.current_context()
- d = _PreservingContextDeferred(current_context)
+ if context is None:
+ context = LoggingContext.current_context()
+ d = _PreservingContextDeferred(context)
deferred.chainDeferred(d)
return d
@@ -316,7 +317,13 @@ def preserve_fn(f):
def g(*args, **kwargs):
with PreserveLoggingContext(current):
- return f(*args, **kwargs)
+ res = f(*args, **kwargs)
+ if isinstance(res, defer.Deferred):
+ return preserve_context_over_deferred(
+ res, context=LoggingContext.sentinel
+ )
+ else:
+ return res
return g
diff --git a/synapse/visibility.py b/synapse/visibility.py
index 948ad51772..cc12c0a23d 100644
--- a/synapse/visibility.py
+++ b/synapse/visibility.py
@@ -17,7 +17,7 @@ from twisted.internet import defer
from synapse.api.constants import Membership, EventTypes
-from synapse.util.logcontext import preserve_fn
+from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
import logging
@@ -55,12 +55,12 @@ def filter_events_for_clients(store, user_tuples, events, event_id_to_state):
given events
events ([synapse.events.EventBase]): list of events to filter
"""
- forgotten = yield defer.gatherResults([
+ forgotten = yield preserve_context_over_deferred(defer.gatherResults([
preserve_fn(store.who_forgot_in_room)(
room_id,
)
for room_id in frozenset(e.room_id for e in events)
- ], consumeErrors=True)
+ ], consumeErrors=True))
# Set of membership event_ids that have been forgotten
event_id_forgotten = frozenset(
|