diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index b6a8b3aa3b..704181d2d3 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -112,8 +112,9 @@ class BaseHandler(object):
guest_access = event.content.get("guest_access", "forbidden")
if guest_access != "can_join":
if context:
+ current_state_ids = yield context.get_current_state_ids(self.store)
current_state = yield self.store.get_events(
- list(context.current_state_ids.values())
+ list(current_state_ids.values())
)
else:
current_state = yield self.state_handler.get_current_state(
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 65f6041b10..14654d59f1 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -21,8 +21,8 @@ import logging
import sys
import six
-from six import iteritems
-from six.moves import http_client
+from six import iteritems, itervalues
+from six.moves import http_client, zip
from signedjson.key import decode_verify_key_bytes
from signedjson.sign import verify_signed_json
@@ -486,7 +486,10 @@ class FederationHandler(BaseHandler):
# joined the room. Don't bother if the user is just
# changing their profile info.
newly_joined = True
- prev_state_id = context.prev_state_ids.get(
+
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
+
+ prev_state_id = prev_state_ids.get(
(event.type, event.state_key)
)
if prev_state_id:
@@ -731,7 +734,7 @@ class FederationHandler(BaseHandler):
"""
joined_users = [
(state_key, int(event.depth))
- for (e_type, state_key), event in state.iteritems()
+ for (e_type, state_key), event in iteritems(state)
if e_type == EventTypes.Member
and event.membership == Membership.JOIN
]
@@ -748,7 +751,7 @@ class FederationHandler(BaseHandler):
except Exception:
pass
- return sorted(joined_domains.iteritems(), key=lambda d: d[1])
+ return sorted(joined_domains.items(), key=lambda d: d[1])
curr_domains = get_domains_from_state(curr_state)
@@ -811,7 +814,7 @@ class FederationHandler(BaseHandler):
tried_domains = set(likely_domains)
tried_domains.add(self.server_name)
- event_ids = list(extremities.iterkeys())
+ event_ids = list(extremities.keys())
logger.debug("calling resolve_state_groups in _maybe_backfill")
resolve = logcontext.preserve_fn(
@@ -827,15 +830,15 @@ class FederationHandler(BaseHandler):
states = dict(zip(event_ids, [s.state for s in states]))
state_map = yield self.store.get_events(
- [e_id for ids in states.itervalues() for e_id in ids.itervalues()],
+ [e_id for ids in itervalues(states) for e_id in itervalues(ids)],
get_prev_content=False
)
states = {
key: {
k: state_map[e_id]
- for k, e_id in state_dict.iteritems()
+ for k, e_id in iteritems(state_dict)
if e_id in state_map
- } for key, state_dict in states.iteritems()
+ } for key, state_dict in iteritems(states)
}
for e_id, _ in sorted_extremeties_tuple:
@@ -1106,10 +1109,12 @@ class FederationHandler(BaseHandler):
user = UserID.from_string(event.state_key)
yield user_joined_room(self.distributor, user, event.room_id)
- state_ids = list(context.prev_state_ids.values())
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
+
+ state_ids = list(prev_state_ids.values())
auth_chain = yield self.store.get_auth_chain(state_ids)
- state = yield self.store.get_events(list(context.prev_state_ids.values()))
+ state = yield self.store.get_events(list(prev_state_ids.values()))
defer.returnValue({
"state": list(state.values()),
@@ -1515,7 +1520,7 @@ class FederationHandler(BaseHandler):
yield self.store.persist_events(
[
(ev_info["event"], context)
- for ev_info, context in itertools.izip(event_infos, contexts)
+ for ev_info, context in zip(event_infos, contexts)
],
backfilled=backfilled,
)
@@ -1635,8 +1640,9 @@ class FederationHandler(BaseHandler):
)
if not auth_events:
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
auth_events_ids = yield self.auth.compute_auth_events(
- event, context.prev_state_ids, for_verification=True,
+ event, prev_state_ids, for_verification=True,
)
auth_events = yield self.store.get_events(auth_events_ids)
auth_events = {
@@ -1876,9 +1882,10 @@ class FederationHandler(BaseHandler):
break
if do_resolution:
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
# 1. Get what we think is the auth chain.
auth_ids = yield self.auth.compute_auth_events(
- event, context.prev_state_ids
+ event, prev_state_ids
)
local_auth_chain = yield self.store.get_auth_chain(
auth_ids, include_given=True
@@ -1968,21 +1975,35 @@ class FederationHandler(BaseHandler):
k: a.event_id for k, a in iteritems(auth_events)
if k != event_key
}
- context.current_state_ids = dict(context.current_state_ids)
- context.current_state_ids.update(state_updates)
+ current_state_ids = yield context.get_current_state_ids(self.store)
+ current_state_ids = dict(current_state_ids)
+
+ current_state_ids.update(state_updates)
+
if context.delta_ids is not None:
- context.delta_ids = dict(context.delta_ids)
- context.delta_ids.update(state_updates)
- context.prev_state_ids = dict(context.prev_state_ids)
- context.prev_state_ids.update({
+ delta_ids = dict(context.delta_ids)
+ delta_ids.update(state_updates)
+
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
+ prev_state_ids = dict(prev_state_ids)
+
+ prev_state_ids.update({
k: a.event_id for k, a in iteritems(auth_events)
})
- context.state_group = yield self.store.store_state_group(
+
+ state_group = yield self.store.store_state_group(
event.event_id,
event.room_id,
prev_group=context.prev_group,
- delta_ids=context.delta_ids,
- current_state_ids=context.current_state_ids,
+ delta_ids=delta_ids,
+ current_state_ids=current_state_ids,
+ )
+
+ yield context.update_state(
+ state_group=state_group,
+ current_state_ids=current_state_ids,
+ prev_state_ids=prev_state_ids,
+ delta_ids=delta_ids,
)
@defer.inlineCallbacks
@@ -2222,7 +2243,8 @@ class FederationHandler(BaseHandler):
event.content["third_party_invite"]["signed"]["token"]
)
original_invite = None
- original_invite_id = context.prev_state_ids.get(key)
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
+ original_invite_id = prev_state_ids.get(key)
if original_invite_id:
original_invite = yield self.store.get_event(
original_invite_id, allow_none=True
@@ -2264,7 +2286,8 @@ class FederationHandler(BaseHandler):
signed = event.content["third_party_invite"]["signed"]
token = signed["token"]
- invite_event_id = context.prev_state_ids.get(
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
+ invite_event_id = prev_state_ids.get(
(EventTypes.ThirdPartyInvite, token,)
)
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index a39b852ceb..7571975c22 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -33,7 +33,7 @@ from synapse.events.utils import serialize_event
from synapse.events.validator import EventValidator
from synapse.replication.http.send_event import send_event_to_master
from synapse.types import RoomAlias, RoomStreamToken, UserID
-from synapse.util.async import Limiter, ReadWriteLock
+from synapse.util.async import Linearizer, ReadWriteLock
from synapse.util.frozenutils import frozendict_json_encoder
from synapse.util.logcontext import run_in_background
from synapse.util.metrics import measure_func
@@ -427,7 +427,7 @@ class EventCreationHandler(object):
# We arbitrarily limit concurrent event creation for a room to 5.
# This is to stop us from diverging history *too* much.
- self.limiter = Limiter(max_count=5)
+ self.limiter = Linearizer(max_count=5, name="room_event_creation_limit")
self.action_generator = hs.get_action_generator()
@@ -630,7 +630,8 @@ class EventCreationHandler(object):
If so, returns the version of the event in context.
Otherwise, returns None.
"""
- prev_event_id = context.prev_state_ids.get((event.type, event.state_key))
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
+ prev_event_id = prev_state_ids.get((event.type, event.state_key))
prev_event = yield self.store.get_event(prev_event_id, allow_none=True)
if not prev_event:
return
@@ -752,8 +753,8 @@ class EventCreationHandler(object):
event = builder.build()
logger.debug(
- "Created event %s with state: %s",
- event.event_id, context.prev_state_ids,
+ "Created event %s",
+ event.event_id,
)
defer.returnValue(
@@ -806,8 +807,9 @@ class EventCreationHandler(object):
# If we're a worker we need to hit out to the master.
if self.config.worker_app:
yield send_event_to_master(
- self.hs.get_clock(),
- self.http_client,
+ clock=self.hs.get_clock(),
+ store=self.store,
+ client=self.http_client,
host=self.config.worker_replication_host,
port=self.config.worker_replication_http_port,
requester=requester,
@@ -884,9 +886,11 @@ class EventCreationHandler(object):
e.sender == event.sender
)
+ current_state_ids = yield context.get_current_state_ids(self.store)
+
state_to_include_ids = [
e_id
- for k, e_id in iteritems(context.current_state_ids)
+ for k, e_id in iteritems(current_state_ids)
if k[0] in self.hs.config.room_invite_state_types
or k == (EventTypes.Member, event.sender)
]
@@ -922,8 +926,9 @@ class EventCreationHandler(object):
)
if event.type == EventTypes.Redaction:
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
auth_events_ids = yield self.auth.compute_auth_events(
- event, context.prev_state_ids, for_verification=True,
+ event, prev_state_ids, for_verification=True,
)
auth_events = yield self.store.get_events(auth_events_ids)
auth_events = {
@@ -943,11 +948,13 @@ class EventCreationHandler(object):
"You don't have permission to redact events"
)
- if event.type == EventTypes.Create and context.prev_state_ids:
- raise AuthError(
- 403,
- "Changing the room create event is forbidden",
- )
+ if event.type == EventTypes.Create:
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
+ if prev_state_ids:
+ raise AuthError(
+ 403,
+ "Changing the room create event is forbidden",
+ )
(event_stream_id, max_stream_id) = yield self.store.persist_event(
event, context=context
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 00f2e279bc..a832d91809 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -201,7 +201,9 @@ class RoomMemberHandler(object):
ratelimit=ratelimit,
)
- prev_member_event_id = context.prev_state_ids.get(
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
+
+ prev_member_event_id = prev_state_ids.get(
(EventTypes.Member, target.to_string()),
None
)
@@ -496,9 +498,10 @@ class RoomMemberHandler(object):
if prev_event is not None:
return
+ prev_state_ids = yield context.get_prev_state_ids(self.store)
if event.membership == Membership.JOIN:
if requester.is_guest:
- guest_can_join = yield self._can_guest_join(context.prev_state_ids)
+ guest_can_join = yield self._can_guest_join(prev_state_ids)
if not guest_can_join:
# This should be an auth check, but guests are a local concept,
# so don't really fit into the general auth process.
@@ -517,7 +520,7 @@ class RoomMemberHandler(object):
ratelimit=ratelimit,
)
- prev_member_event_id = context.prev_state_ids.get(
+ prev_member_event_id = prev_state_ids.get(
(EventTypes.Member, event.state_key),
None
)
|