diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py
index 694aed3a7d..ceb03ce6c2 100644
--- a/synapse/crypto/keyring.py
+++ b/synapse/crypto/keyring.py
@@ -135,7 +135,7 @@ class Keyring(object):
time_now_ms = self.clock.time_msec()
- self.store.store_server_certificate(
+ yield self.store.store_server_certificate(
server_name,
server_name,
time_now_ms,
@@ -143,7 +143,7 @@ class Keyring(object):
)
for key_id, key in verify_keys.items():
- self.store.store_server_verify_key(
+ yield self.store.store_server_verify_key(
server_name, server_name, time_now_ms, key
)
diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py
index 65a53ae17c..996b8ea5bf 100644
--- a/synapse/federation/replication.py
+++ b/synapse/federation/replication.py
@@ -24,6 +24,7 @@ from .units import Transaction, Edu
from .persistence import TransactionActions
from synapse.util.logutils import log_function
+from synapse.util.logcontext import PreserveLoggingContext
import logging
@@ -319,19 +320,20 @@ class ReplicationLayer(object):
logger.debug("[%s] Transacition is new", transaction.transaction_id)
- dl = []
- for pdu in pdu_list:
- dl.append(self._handle_new_pdu(transaction.origin, pdu))
+ with PreserveLoggingContext():
+ dl = []
+ for pdu in pdu_list:
+ dl.append(self._handle_new_pdu(transaction.origin, pdu))
- if hasattr(transaction, "edus"):
- for edu in [Edu(**x) for x in transaction.edus]:
- self.received_edu(
- transaction.origin,
- edu.edu_type,
- edu.content
- )
+ if hasattr(transaction, "edus"):
+ for edu in [Edu(**x) for x in transaction.edus]:
+ self.received_edu(
+ transaction.origin,
+ edu.edu_type,
+ edu.content
+ )
- results = yield defer.DeferredList(dl)
+ results = yield defer.DeferredList(dl)
ret = []
for r in results:
@@ -649,7 +651,8 @@ class _TransactionQueue(object):
(pdu, deferred, order)
)
- self._attempt_new_transaction(destination)
+ with PreserveLoggingContext():
+ self._attempt_new_transaction(destination)
deferreds.append(deferred)
@@ -669,7 +672,9 @@ class _TransactionQueue(object):
deferred.errback(failure)
else:
logger.exception("Failed to send edu", failure)
- self._attempt_new_transaction(destination).addErrback(eb)
+
+ with PreserveLoggingContext():
+ self._attempt_new_transaction(destination).addErrback(eb)
return deferred
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index 30c6733063..d53cd3df3e 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -112,7 +112,7 @@ class BaseHandler(object):
event.destinations = list(destinations)
- self.notifier.on_new_room_event(event, extra_users=extra_users)
+ yield self.notifier.on_new_room_event(event, extra_users=extra_users)
federation_handler = self.hs.get_handlers().federation_handler
yield federation_handler.handle_new_event(event, snapshot)
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index 4993c92b74..d59221a4fb 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -56,7 +56,7 @@ class EventStreamHandler(BaseHandler):
self.clock.cancel_call_later(
self._stop_timer_per_user.pop(auth_user))
else:
- self.distributor.fire(
+ yield self.distributor.fire(
"started_user_eventstream", auth_user
)
self._streams_per_user[auth_user] += 1
@@ -65,8 +65,10 @@ class EventStreamHandler(BaseHandler):
pagin_config.from_token = None
rm_handler = self.hs.get_handlers().room_member_handler
+ logger.debug("BETA")
room_ids = yield rm_handler.get_rooms_for_user(auth_user)
+ logger.debug("ALPHA")
with PreserveLoggingContext():
events, tokens = yield self.notifier.get_events_for(
auth_user, room_ids, pagin_config, timeout
@@ -93,7 +95,7 @@ class EventStreamHandler(BaseHandler):
logger.debug(
"_later stopped_user_eventstream %s", auth_user
)
- self.distributor.fire(
+ yield self.distributor.fire(
"stopped_user_eventstream", auth_user
)
del self._stop_timer_per_user[auth_user]
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 492005a170..e8fb7eae58 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -209,7 +209,7 @@ class FederationHandler(BaseHandler):
if event.type == RoomMemberEvent.TYPE:
if event.membership == Membership.JOIN:
user = self.hs.parse_userid(event.state_key)
- self.distributor.fire(
+ yield self.distributor.fire(
"user_joined_room", user=user, room_id=event.room_id
)
@@ -414,7 +414,7 @@ class FederationHandler(BaseHandler):
if event.type == RoomMemberEvent.TYPE:
if event.membership == Membership.JOIN:
user = self.hs.parse_userid(event.state_key)
- self.distributor.fire(
+ yield self.distributor.fire(
"user_joined_room", user=user, room_id=event.room_id
)
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index de70486b29..f460657f31 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -18,6 +18,7 @@ from twisted.internet import defer
from synapse.api.constants import Membership
from synapse.api.errors import RoomError
from synapse.streams.config import PaginationConfig
+from synapse.util.logcontext import PreserveLoggingContext
from ._base import BaseHandler
import logging
@@ -86,9 +87,10 @@ class MessageHandler(BaseHandler):
event, snapshot, suppress_auth=suppress_auth
)
- self.hs.get_handlers().presence_handler.bump_presence_active_time(
- user
- )
+ with PreserveLoggingContext():
+ self.hs.get_handlers().presence_handler.bump_presence_active_time(
+ user
+ )
@defer.inlineCallbacks
def get_messages(self, user_id=None, room_id=None, pagin_config=None,
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index fcc92a8e32..b55d589daf 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -19,6 +19,7 @@ from synapse.api.errors import SynapseError, AuthError
from synapse.api.constants import PresenceState
from synapse.util.logutils import log_function
+from synapse.util.logcontext import PreserveLoggingContext
from ._base import BaseHandler
@@ -142,7 +143,7 @@ class PresenceHandler(BaseHandler):
return UserPresenceCache()
def registered_user(self, user):
- self.store.create_presence(user.localpart)
+ return self.store.create_presence(user.localpart)
@defer.inlineCallbacks
def is_presence_visible(self, observer_user, observed_user):
@@ -241,14 +242,12 @@ class PresenceHandler(BaseHandler):
was_level = self.STATE_LEVELS[statuscache.get_state()["presence"]]
now_level = self.STATE_LEVELS[state["presence"]]
- yield defer.DeferredList([
- self.store.set_presence_state(
- target_user.localpart, state_to_store
- ),
- self.distributor.fire(
- "collect_presencelike_data", target_user, state
- ),
- ])
+ yield self.store.set_presence_state(
+ target_user.localpart, state_to_store
+ )
+ yield self.distributor.fire(
+ "collect_presencelike_data", target_user, state
+ )
if now_level > was_level:
state["last_active"] = self.clock.time_msec()
@@ -256,14 +255,15 @@ class PresenceHandler(BaseHandler):
now_online = state["presence"] != PresenceState.OFFLINE
was_polling = target_user in self._user_cachemap
- if now_online and not was_polling:
- self.start_polling_presence(target_user, state=state)
- elif not now_online and was_polling:
- self.stop_polling_presence(target_user)
+ with PreserveLoggingContext():
+ if now_online and not was_polling:
+ self.start_polling_presence(target_user, state=state)
+ elif not now_online and was_polling:
+ self.stop_polling_presence(target_user)
- # TODO(paul): perform a presence push as part of start/stop poll so
- # we don't have to do this all the time
- self.changed_presencelike_data(target_user, state)
+ # TODO(paul): perform a presence push as part of start/stop poll so
+ # we don't have to do this all the time
+ self.changed_presencelike_data(target_user, state)
def bump_presence_active_time(self, user, now=None):
if now is None:
@@ -277,7 +277,7 @@ class PresenceHandler(BaseHandler):
self._user_cachemap_latest_serial += 1
statuscache.update(state, serial=self._user_cachemap_latest_serial)
- self.push_presence(user, statuscache=statuscache)
+ return self.push_presence(user, statuscache=statuscache)
@log_function
def started_user_eventstream(self, user):
@@ -381,8 +381,10 @@ class PresenceHandler(BaseHandler):
yield self.store.set_presence_list_accepted(
observer_user.localpart, observed_user.to_string()
)
-
- self.start_polling_presence(observer_user, target_user=observed_user)
+ with PreserveLoggingContext():
+ self.start_polling_presence(
+ observer_user, target_user=observed_user
+ )
@defer.inlineCallbacks
def deny_presence(self, observed_user, observer_user):
@@ -401,7 +403,10 @@ class PresenceHandler(BaseHandler):
observer_user.localpart, observed_user.to_string()
)
- self.stop_polling_presence(observer_user, target_user=observed_user)
+ with PreserveLoggingContext():
+ self.stop_polling_presence(
+ observer_user, target_user=observed_user
+ )
@defer.inlineCallbacks
def get_presence_list(self, observer_user, accepted=None):
@@ -710,7 +715,8 @@ class PresenceHandler(BaseHandler):
if not self._remote_sendmap[user]:
del self._remote_sendmap[user]
- yield defer.DeferredList(deferreds)
+ with PreserveLoggingContext():
+ yield defer.DeferredList(deferreds)
@defer.inlineCallbacks
def push_update_to_local_and_remote(self, observed_user, statuscache,
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index 7853bf5098..814b3b68fe 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -17,6 +17,7 @@ from twisted.internet import defer
from synapse.api.errors import SynapseError, AuthError, CodeMessageException
from synapse.api.constants import Membership
+from synapse.util.logcontext import PreserveLoggingContext
from ._base import BaseHandler
@@ -46,7 +47,7 @@ class ProfileHandler(BaseHandler):
)
def registered_user(self, user):
- self.store.create_profile(user.localpart)
+ return self.store.create_profile(user.localpart)
@defer.inlineCallbacks
def get_displayname(self, target_user):
@@ -152,13 +153,14 @@ class ProfileHandler(BaseHandler):
if not user.is_mine:
defer.returnValue(None)
- (displayname, avatar_url) = yield defer.gatherResults(
- [
- self.store.get_profile_displayname(user.localpart),
- self.store.get_profile_avatar_url(user.localpart),
- ],
- consumeErrors=True
- )
+ with PreserveLoggingContext():
+ (displayname, avatar_url) = yield defer.gatherResults(
+ [
+ self.store.get_profile_displayname(user.localpart),
+ self.store.get_profile_avatar_url(user.localpart),
+ ],
+ consumeErrors=True
+ )
state["displayname"] = displayname
state["avatar_url"] = avatar_url
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 7df9d9b82d..c59ac1a3c8 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -69,7 +69,7 @@ class RegistrationHandler(BaseHandler):
password_hash=password_hash
)
- self.distributor.fire("registered_user", user)
+ yield self.distributor.fire("registered_user", user)
else:
# autogen a random user ID
attempts = 0
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 7d9458e1d0..7252051744 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -178,7 +178,7 @@ class RoomCreationHandler(BaseHandler):
if room_alias:
result["room_alias"] = room_alias.to_string()
- directory_handler.send_room_alias_update_event(user_id, room_id)
+ yield directory_handler.send_room_alias_update_event(user_id, room_id)
defer.returnValue(result)
@@ -480,7 +480,7 @@ class RoomMemberHandler(BaseHandler):
)
user = self.hs.parse_userid(event.user_id)
- self.distributor.fire(
+ yield self.distributor.fire(
"user_joined_room", user=user, room_id=room_id
)
diff --git a/synapse/notifier.py b/synapse/notifier.py
index c310a9fed6..0c8ca6ec66 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -17,6 +17,7 @@ from twisted.internet import defer
from synapse.util.logutils import log_function
from synapse.util.logcontext import PreserveLoggingContext
+from synapse.util.async import run_on_reactor
import logging
@@ -96,6 +97,7 @@ class Notifier(object):
listening to the room, and any listeners for the users in the
`extra_users` param.
"""
+ yield run_on_reactor()
room_id = event.room_id
room_source = self.event_sources.sources["room"]
@@ -143,6 +145,7 @@ class Notifier(object):
Will wake up all listeners for the given users and rooms.
"""
+ yield run_on_reactor()
presence_source = self.event_sources.sources["presence"]
listeners = set()
diff --git a/synapse/rest/presence.py b/synapse/rest/presence.py
index 138cc88a05..502ed0d4ca 100644
--- a/synapse/rest/presence.py
+++ b/synapse/rest/presence.py
@@ -117,8 +117,6 @@ class PresenceListRestServlet(RestServlet):
logger.exception("JSON parse error")
raise SynapseError(400, "Unable to parse content")
- deferreds = []
-
if "invite" in content:
for u in content["invite"]:
if not isinstance(u, basestring):
@@ -126,8 +124,9 @@ class PresenceListRestServlet(RestServlet):
if len(u) == 0:
continue
invited_user = self.hs.parse_userid(u)
- deferreds.append(self.handlers.presence_handler.send_invite(
- observer_user=user, observed_user=invited_user))
+ yield self.handlers.presence_handler.send_invite(
+ observer_user=user, observed_user=invited_user
+ )
if "drop" in content:
for u in content["drop"]:
@@ -136,10 +135,9 @@ class PresenceListRestServlet(RestServlet):
if len(u) == 0:
continue
dropped_user = self.hs.parse_userid(u)
- deferreds.append(self.handlers.presence_handler.drop(
- observer_user=user, observed_user=dropped_user))
-
- yield defer.DeferredList(deferreds)
+ yield self.handlers.presence_handler.drop(
+ observer_user=user, observed_user=dropped_user
+ )
defer.returnValue((200, {}))
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 5d4be09a82..2c04a1c5be 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -115,7 +115,6 @@ class SQLBaseStore(object):
"[TXN END] {%s} %f",
name, end - start
)
-
with PreserveLoggingContext():
result = yield self._db_pool.runInteraction(
inner_func, *args, **kwargs
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 93329703a2..c37df59d45 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -177,8 +177,8 @@ class RoomMemberStore(SQLBaseStore):
return self._get_members_query(clause, vals)
def _get_members_query(self, where_clause, where_values):
- return self._db_pool.runInteraction(
- self._get_members_query_txn,
+ return self.runInteraction(
+ "get_members_query", self._get_members_query_txn,
where_clause, where_values
)
diff --git a/synapse/util/distributor.py b/synapse/util/distributor.py
index eddbe5837f..701ccdb781 100644
--- a/synapse/util/distributor.py
+++ b/synapse/util/distributor.py
@@ -13,6 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from synapse.util.logcontext import PreserveLoggingContext
+
from twisted.internet import defer
import logging
@@ -91,6 +93,7 @@ class Signal(object):
Each observer callable may return a Deferred."""
self.observers.append(observer)
+ @defer.inlineCallbacks
def fire(self, *args, **kwargs):
"""Invokes every callable in the observer list, passing in the args and
kwargs. Exceptions thrown by observers are logged but ignored. It is
@@ -98,22 +101,24 @@ class Signal(object):
Returns a Deferred that will complete when all the observers have
completed."""
- deferreds = []
- for observer in self.observers:
- d = defer.maybeDeferred(observer, *args, **kwargs)
-
- def eb(failure):
- logger.warning(
- "%s signal observer %s failed: %r",
- self.name, observer, failure,
- exc_info=(
- failure.type,
- failure.value,
- failure.getTracebackObject()))
- if not self.suppress_failures:
- raise failure
- deferreds.append(d.addErrback(eb))
-
- return defer.DeferredList(
- deferreds, fireOnOneErrback=not self.suppress_failures
- )
+ with PreserveLoggingContext():
+ deferreds = []
+ for observer in self.observers:
+ d = defer.maybeDeferred(observer, *args, **kwargs)
+
+ def eb(failure):
+ logger.warning(
+ "%s signal observer %s failed: %r",
+ self.name, observer, failure,
+ exc_info=(
+ failure.type,
+ failure.value,
+ failure.getTracebackObject()))
+ if not self.suppress_failures:
+ raise failure
+ deferreds.append(d.addErrback(eb))
+
+ result = yield defer.DeferredList(
+ deferreds, fireOnOneErrback=not self.suppress_failures
+ )
+ defer.returnValue(result)
|