diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index e484061cc0..969e588e73 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -14,9 +14,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
-import sys
-import six
from six import iteritems, itervalues, string_types
from canonicaljson import encode_canonical_json, json
@@ -37,6 +35,7 @@ from synapse.crypto.event_signing import add_hashes_and_signatures
from synapse.events.utils import serialize_event
from synapse.events.validator import EventValidator
from synapse.replication.http.send_event import ReplicationSendEventRestServlet
+from synapse.storage.state import StateFilter
from synapse.types import RoomAlias, UserID
from synapse.util.async_helpers import Linearizer
from synapse.util.frozenutils import frozendict_json_encoder
@@ -82,7 +81,7 @@ class MessageHandler(object):
elif membership == Membership.LEAVE:
key = (event_type, state_key)
room_state = yield self.store.get_state_for_events(
- [membership_event_id], [key]
+ [membership_event_id], StateFilter.from_types([key])
)
data = room_state[membership_event_id].get(key)
@@ -90,7 +89,7 @@ class MessageHandler(object):
@defer.inlineCallbacks
def get_state_events(
- self, user_id, room_id, types=None, filtered_types=None,
+ self, user_id, room_id, state_filter=StateFilter.all(),
at_token=None, is_guest=False,
):
"""Retrieve all state events for a given room. If the user is
@@ -102,13 +101,8 @@ class MessageHandler(object):
Args:
user_id(str): The user requesting state events.
room_id(str): The room ID to get all state events from.
- types(list[(str, str|None)]|None): List of (type, state_key) tuples
- which are used to filter the state fetched. If `state_key` is None,
- all events are returned of the given type.
- May be None, which matches any key.
- filtered_types(list[str]|None): Only apply filtering via `types` to this
- list of event types. Other types of events are returned unfiltered.
- If None, `types` filtering is applied to all events.
+ state_filter (StateFilter): The state filter used to fetch state
+ from the database.
at_token(StreamToken|None): the stream token of the at which we are requesting
the stats. If the user is not allowed to view the state as of that
stream token, we raise a 403 SynapseError. If None, returns the current
@@ -141,7 +135,7 @@ class MessageHandler(object):
event = last_events[0]
if visible_events:
room_state = yield self.store.get_state_for_events(
- [event.event_id], types, filtered_types=filtered_types,
+ [event.event_id], state_filter=state_filter,
)
room_state = room_state[event.event_id]
else:
@@ -160,12 +154,12 @@ class MessageHandler(object):
if membership == Membership.JOIN:
state_ids = yield self.store.get_filtered_current_state_ids(
- room_id, types, filtered_types=filtered_types,
+ room_id, state_filter=state_filter,
)
room_state = yield self.store.get_events(state_ids.values())
elif membership == Membership.LEAVE:
room_state = yield self.store.get_state_for_events(
- [membership_event_id], types, filtered_types=filtered_types,
+ [membership_event_id], state_filter=state_filter,
)
room_state = room_state[membership_event_id]
@@ -624,6 +618,9 @@ class EventCreationHandler(object):
event, context
)
+ # reraise does not allow inlineCallbacks to preserve the stacktrace, so we
+ # hack around with a try/finally instead.
+ success = False
try:
# If we're a worker we need to hit out to the master.
if self.config.worker_app:
@@ -636,6 +633,7 @@ class EventCreationHandler(object):
ratelimit=ratelimit,
extra_users=extra_users,
)
+ success = True
return
yield self.persist_and_notify_client_event(
@@ -645,17 +643,16 @@ class EventCreationHandler(object):
ratelimit=ratelimit,
extra_users=extra_users,
)
- except: # noqa: E722, as we reraise the exception this is fine.
- # Ensure that we actually remove the entries in the push actions
- # staging area, if we calculated them.
- tp, value, tb = sys.exc_info()
-
- run_in_background(
- self.store.remove_push_actions_from_staging,
- event.event_id,
- )
- six.reraise(tp, value, tb)
+ success = True
+ finally:
+ if not success:
+ # Ensure that we actually remove the entries in the push actions
+ # staging area, if we calculated them.
+ run_in_background(
+ self.store.remove_push_actions_from_staging,
+ event.event_id,
+ )
@defer.inlineCallbacks
def persist_and_notify_client_event(
@@ -778,7 +775,7 @@ class EventCreationHandler(object):
event, context=context
)
- self.pusher_pool.on_new_notifications(
+ yield self.pusher_pool.on_new_notifications(
event_stream_id, max_stream_id,
)
|