diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index 3038df4ab8..4f9c3c9db8 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -814,17 +814,16 @@ class Auth(object):
return auth_ids
- @log_function
- def _can_send_event(self, event, auth_events):
+ def _get_send_level(self, etype, state_key, auth_events):
key = (EventTypes.PowerLevels, "", )
send_level_event = auth_events.get(key)
send_level = None
if send_level_event:
send_level = send_level_event.content.get("events", {}).get(
- event.type
+ etype
)
if send_level is None:
- if hasattr(event, "state_key"):
+ if state_key is not None:
send_level = send_level_event.content.get(
"state_default", 50
)
@@ -838,6 +837,13 @@ class Auth(object):
else:
send_level = 0
+ return send_level
+
+ @log_function
+ def _can_send_event(self, event, auth_events):
+ send_level = self._get_send_level(
+ event.type, event.get("state_key", None), auth_events
+ )
user_level = self._get_user_power_level(event.user_id, auth_events)
if user_level < send_level:
@@ -982,3 +988,43 @@ class Auth(object):
"You don't have permission to add ops level greater "
"than your own"
)
+
+ @defer.inlineCallbacks
+ def check_can_change_room_list(self, room_id, user):
+ """Check if the user is allowed to edit the room's entry in the
+ published room list.
+
+ Args:
+ room_id (str)
+ user (UserID)
+ """
+
+ is_admin = yield self.is_server_admin(user)
+ if is_admin:
+ defer.returnValue(True)
+
+ user_id = user.to_string()
+ yield self.check_joined_room(room_id, user_id)
+
+ # We currently require the user is a "moderator" in the room. We do this
+ # by checking if they would (theoretically) be able to change the
+ # m.room.aliases events
+ power_level_event = yield self.state.get_current_state(
+ room_id, EventTypes.PowerLevels, ""
+ )
+
+ auth_events = {}
+ if power_level_event:
+ auth_events[(EventTypes.PowerLevels, "")] = power_level_event
+
+ send_level = self._get_send_level(
+ EventTypes.Aliases, "", auth_events
+ )
+ user_level = self._get_user_power_level(user_id, auth_events)
+
+ if user_level < send_level:
+ raise AuthError(
+ 403,
+ "This server requires you to be a moderator in the room to"
+ " edit its room list entry"
+ )
diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py
index abed6b5e6b..23f8b612ae 100644
--- a/synapse/events/__init__.py
+++ b/synapse/events/__init__.py
@@ -14,6 +14,7 @@
# limitations under the License.
from synapse.util.frozenutils import freeze
+from synapse.util.caches import intern_dict
# Whether we should use frozen_dict in FrozenEvent. Using frozen_dicts prevents
@@ -140,6 +141,10 @@ class FrozenEvent(EventBase):
unsigned = dict(event_dict.pop("unsigned", {}))
+ # We intern these strings because they turn up a lot (especially when
+ # caching).
+ event_dict = intern_dict(event_dict)
+
if USE_FROZEN_DICTS:
frozen_dict = freeze(event_dict)
else:
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 83c1f46586..37ee469fa2 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -418,6 +418,7 @@ class FederationClient(FederationBase):
"Failed to make_%s via %s: %s",
membership, destination, e.message
)
+ raise
raise RuntimeError("Failed to send to any server.")
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 76820b924b..429ab6ddec 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -531,7 +531,6 @@ class FederationServer(FederationBase):
yield self.handler.on_receive_pdu(
origin,
pdu,
- backfilled=False,
state=state,
auth_chain=auth_chain,
)
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 208bff8d4f..d65a7893d8 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -175,7 +175,7 @@ class BaseFederationServlet(object):
class FederationSendServlet(BaseFederationServlet):
- PATH = "/send/([^/]*)/"
+ PATH = "/send/(?P<transaction_id>[^/]*)/"
def __init__(self, handler, server_name, **kwargs):
super(FederationSendServlet, self).__init__(
@@ -250,7 +250,7 @@ class FederationPullServlet(BaseFederationServlet):
class FederationEventServlet(BaseFederationServlet):
- PATH = "/event/([^/]*)/"
+ PATH = "/event/(?P<event_id>[^/]*)/"
# This is when someone asks for a data item for a given server data_id pair.
def on_GET(self, origin, content, query, event_id):
@@ -258,7 +258,7 @@ class FederationEventServlet(BaseFederationServlet):
class FederationStateServlet(BaseFederationServlet):
- PATH = "/state/([^/]*)/"
+ PATH = "/state/(?P<context>[^/]*)/"
# This is when someone asks for all data for a given context.
def on_GET(self, origin, content, query, context):
@@ -270,7 +270,7 @@ class FederationStateServlet(BaseFederationServlet):
class FederationBackfillServlet(BaseFederationServlet):
- PATH = "/backfill/([^/]*)/"
+ PATH = "/backfill/(?P<context>[^/]*)/"
def on_GET(self, origin, content, query, context):
versions = query["v"]
@@ -285,7 +285,7 @@ class FederationBackfillServlet(BaseFederationServlet):
class FederationQueryServlet(BaseFederationServlet):
- PATH = "/query/([^/]*)"
+ PATH = "/query/(?P<query_type>[^/]*)"
# This is when we receive a server-server Query
def on_GET(self, origin, content, query, query_type):
@@ -296,7 +296,7 @@ class FederationQueryServlet(BaseFederationServlet):
class FederationMakeJoinServlet(BaseFederationServlet):
- PATH = "/make_join/([^/]*)/([^/]*)"
+ PATH = "/make_join/(?P<context>[^/]*)/(?P<user_id>[^/]*)"
@defer.inlineCallbacks
def on_GET(self, origin, content, query, context, user_id):
@@ -305,7 +305,7 @@ class FederationMakeJoinServlet(BaseFederationServlet):
class FederationMakeLeaveServlet(BaseFederationServlet):
- PATH = "/make_leave/([^/]*)/([^/]*)"
+ PATH = "/make_leave/(?P<context>[^/]*)/(?P<user_id>[^/]*)"
@defer.inlineCallbacks
def on_GET(self, origin, content, query, context, user_id):
@@ -314,7 +314,7 @@ class FederationMakeLeaveServlet(BaseFederationServlet):
class FederationSendLeaveServlet(BaseFederationServlet):
- PATH = "/send_leave/([^/]*)/([^/]*)"
+ PATH = "/send_leave/(?P<room_id>[^/]*)/(?P<txid>[^/]*)"
@defer.inlineCallbacks
def on_PUT(self, origin, content, query, room_id, txid):
@@ -323,14 +323,14 @@ class FederationSendLeaveServlet(BaseFederationServlet):
class FederationEventAuthServlet(BaseFederationServlet):
- PATH = "/event_auth/([^/]*)/([^/]*)"
+ PATH = "/event_auth(?P<context>[^/]*)/(?P<event_id>[^/]*)"
def on_GET(self, origin, content, query, context, event_id):
return self.handler.on_event_auth(origin, context, event_id)
class FederationSendJoinServlet(BaseFederationServlet):
- PATH = "/send_join/([^/]*)/([^/]*)"
+ PATH = "/send_join/(?P<context>[^/]*)/(?P<event_id>[^/]*)"
@defer.inlineCallbacks
def on_PUT(self, origin, content, query, context, event_id):
@@ -341,7 +341,7 @@ class FederationSendJoinServlet(BaseFederationServlet):
class FederationInviteServlet(BaseFederationServlet):
- PATH = "/invite/([^/]*)/([^/]*)"
+ PATH = "/invite/(?P<context>[^/]*)/(?P<event_id>[^/]*)"
@defer.inlineCallbacks
def on_PUT(self, origin, content, query, context, event_id):
@@ -352,7 +352,7 @@ class FederationInviteServlet(BaseFederationServlet):
class FederationThirdPartyInviteExchangeServlet(BaseFederationServlet):
- PATH = "/exchange_third_party_invite/([^/]*)"
+ PATH = "/exchange_third_party_invite/(?P<room_id>[^/]*)"
@defer.inlineCallbacks
def on_PUT(self, origin, content, query, room_id):
@@ -381,7 +381,7 @@ class FederationClientKeysClaimServlet(BaseFederationServlet):
class FederationQueryAuthServlet(BaseFederationServlet):
- PATH = "/query_auth/([^/]*)/([^/]*)"
+ PATH = "/query_auth/(?P<context>[^/]*)/(?P<event_id>[^/]*)"
@defer.inlineCallbacks
def on_POST(self, origin, content, query, context, event_id):
@@ -394,7 +394,7 @@ class FederationQueryAuthServlet(BaseFederationServlet):
class FederationGetMissingEventsServlet(BaseFederationServlet):
# TODO(paul): Why does this path alone end with "/?" optional?
- PATH = "/get_missing_events/([^/]*)/?"
+ PATH = "/get_missing_events/(?P<room_id>[^/]*)/?"
@defer.inlineCallbacks
def on_POST(self, origin, content, query, room_id):
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index 6bcc5a5e2b..8eeb225811 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -317,3 +317,25 @@ class DirectoryHandler(BaseHandler):
is_admin = yield self.auth.is_server_admin(UserID.from_string(user_id))
defer.returnValue(is_admin)
+
+ @defer.inlineCallbacks
+ def edit_published_room_list(self, requester, room_id, visibility):
+ """Edit the entry of the room in the published room list.
+
+ requester
+ room_id (str)
+ visibility (str): "public" or "private"
+ """
+ if requester.is_guest:
+ raise AuthError(403, "Guests cannot edit the published room list")
+
+ if visibility not in ["public", "private"]:
+ raise SynapseError(400, "Invalid visibility setting")
+
+ room = yield self.store.get_room(room_id)
+ if room is None:
+ raise SynapseError(400, "Unknown room")
+
+ yield self.auth.check_can_change_room_list(room_id, requester.user)
+
+ yield self.store.set_room_is_public(room_id, visibility == "public")
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index f599e817aa..267fedf114 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -102,7 +102,7 @@ class FederationHandler(BaseHandler):
@log_function
@defer.inlineCallbacks
- def on_receive_pdu(self, origin, pdu, backfilled, state=None,
+ def on_receive_pdu(self, origin, pdu, state=None,
auth_chain=None):
""" Called by the ReplicationLayer when we have a new pdu. We need to
do auth checks and put it through the StateHandler.
@@ -123,7 +123,6 @@ class FederationHandler(BaseHandler):
# FIXME (erikj): Awful hack to make the case where we are not currently
# in the room work
- current_state = None
is_in_room = yield self.auth.check_host_in_room(
event.room_id,
self.server_name
@@ -186,8 +185,6 @@ class FederationHandler(BaseHandler):
origin,
event,
state=state,
- backfilled=backfilled,
- current_state=current_state,
)
except AuthError as e:
raise FederationError(
@@ -216,18 +213,17 @@ class FederationHandler(BaseHandler):
except StoreError:
logger.exception("Failed to store room.")
- if not backfilled:
- extra_users = []
- if event.type == EventTypes.Member:
- target_user_id = event.state_key
- target_user = UserID.from_string(target_user_id)
- extra_users.append(target_user)
+ extra_users = []
+ if event.type == EventTypes.Member:
+ target_user_id = event.state_key
+ target_user = UserID.from_string(target_user_id)
+ extra_users.append(target_user)
- with PreserveLoggingContext():
- self.notifier.on_new_room_event(
- event, event_stream_id, max_stream_id,
- extra_users=extra_users
- )
+ with PreserveLoggingContext():
+ self.notifier.on_new_room_event(
+ event, event_stream_id, max_stream_id,
+ extra_users=extra_users
+ )
if event.type == EventTypes.Member:
if event.membership == Membership.JOIN:
@@ -647,7 +643,7 @@ class FederationHandler(BaseHandler):
continue
try:
- self.on_receive_pdu(origin, p, backfilled=False)
+ self.on_receive_pdu(origin, p)
except:
logger.exception("Couldn't handle pdu")
@@ -779,7 +775,6 @@ class FederationHandler(BaseHandler):
event_stream_id, max_stream_id = yield self.store.persist_event(
event,
context=context,
- backfilled=False,
)
target_user = UserID.from_string(event.state_key)
@@ -819,7 +814,6 @@ class FederationHandler(BaseHandler):
event_stream_id, max_stream_id = yield self.store.persist_event(
event,
context=context,
- backfilled=False,
)
target_user = UserID.from_string(event.state_key)
@@ -1074,8 +1068,7 @@ class FederationHandler(BaseHandler):
@defer.inlineCallbacks
@log_function
- def _handle_new_event(self, origin, event, state=None, backfilled=False,
- current_state=None, auth_events=None):
+ def _handle_new_event(self, origin, event, state=None, auth_events=None):
outlier = event.internal_metadata.is_outlier()
@@ -1085,7 +1078,7 @@ class FederationHandler(BaseHandler):
auth_events=auth_events,
)
- if not backfilled and not event.internal_metadata.is_outlier():
+ if not event.internal_metadata.is_outlier():
action_generator = ActionGenerator(self.hs)
yield action_generator.handle_push_actions_for_event(
event, context, self
@@ -1094,9 +1087,7 @@ class FederationHandler(BaseHandler):
event_stream_id, max_stream_id = yield self.store.persist_event(
event,
context=context,
- backfilled=backfilled,
- is_new_state=(not outlier and not backfilled),
- current_state=current_state,
+ is_new_state=not outlier,
)
defer.returnValue((context, event_stream_id, max_stream_id))
@@ -1194,7 +1185,6 @@ class FederationHandler(BaseHandler):
event_stream_id, max_stream_id = yield self.store.persist_event(
event, new_event_context,
- backfilled=False,
is_new_state=True,
current_state=state,
)
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 051468989f..133183a257 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -25,6 +25,7 @@ from synapse.api.constants import (
from synapse.api.errors import AuthError, StoreError, SynapseError, Codes
from synapse.util import stringutils, unwrapFirstError
from synapse.util.logcontext import preserve_context_over_fn
+from synapse.util.caches.response_cache import ResponseCache
from signedjson.sign import verify_signed_json
from signedjson.key import decode_verify_key_bytes
@@ -119,7 +120,8 @@ class RoomCreationHandler(BaseHandler):
invite_3pid_list = config.get("invite_3pid", [])
- is_public = config.get("visibility", None) == "public"
+ visibility = config.get("visibility", None)
+ is_public = visibility == "public"
# autogen room IDs and try to create it. We may clash, so just
# try a few times till one goes through, giving up eventually.
@@ -155,9 +157,9 @@ class RoomCreationHandler(BaseHandler):
preset_config = config.get(
"preset",
- RoomCreationPreset.PUBLIC_CHAT
- if is_public
- else RoomCreationPreset.PRIVATE_CHAT
+ RoomCreationPreset.PRIVATE_CHAT
+ if visibility == "private"
+ else RoomCreationPreset.PUBLIC_CHAT
)
raw_initial_state = config.get("initial_state", [])
@@ -938,61 +940,79 @@ class RoomMemberHandler(BaseHandler):
class RoomListHandler(BaseHandler):
+ def __init__(self, hs):
+ super(RoomListHandler, self).__init__(hs)
+ self.response_cache = ResponseCache()
- @defer.inlineCallbacks
def get_public_room_list(self):
+ result = self.response_cache.get(())
+ if not result:
+ result = self.response_cache.set((), self._get_public_room_list())
+ return result
+
+ @defer.inlineCallbacks
+ def _get_public_room_list(self):
room_ids = yield self.store.get_public_room_ids()
@defer.inlineCallbacks
def handle_room(room_id):
aliases = yield self.store.get_aliases_for_room(room_id)
- if not aliases:
- defer.returnValue(None)
- state = yield self.state_handler.get_current_state(room_id)
+ # We pull each bit of state out indvidually to avoid pulling the
+ # full state into memory. Due to how the caching works this should
+ # be fairly quick, even if not originally in the cache.
+ def get_state(etype, state_key):
+ return self.state_handler.get_current_state(room_id, etype, state_key)
+
+ # Double check that this is actually a public room.
+ join_rules_event = yield get_state(EventTypes.JoinRules, "")
+ if join_rules_event:
+ join_rule = join_rules_event.content.get("join_rule", None)
+ if join_rule and join_rule != JoinRules.PUBLIC:
+ defer.returnValue(None)
- result = {"aliases": aliases, "room_id": room_id}
+ result = {"room_id": room_id}
+ if aliases:
+ result["aliases"] = aliases
- name_event = state.get((EventTypes.Name, ""), None)
+ name_event = yield get_state(EventTypes.Name, "")
if name_event:
name = name_event.content.get("name", None)
if name:
result["name"] = name
- topic_event = state.get((EventTypes.Topic, ""), None)
+ topic_event = yield get_state(EventTypes.Topic, "")
if topic_event:
topic = topic_event.content.get("topic", None)
if topic:
result["topic"] = topic
- canonical_event = state.get((EventTypes.CanonicalAlias, ""), None)
+ canonical_event = yield get_state(EventTypes.CanonicalAlias, "")
if canonical_event:
canonical_alias = canonical_event.content.get("alias", None)
if canonical_alias:
result["canonical_alias"] = canonical_alias
- visibility_event = state.get((EventTypes.RoomHistoryVisibility, ""), None)
+ visibility_event = yield get_state(EventTypes.RoomHistoryVisibility, "")
visibility = None
if visibility_event:
visibility = visibility_event.content.get("history_visibility", None)
result["world_readable"] = visibility == "world_readable"
- guest_event = state.get((EventTypes.GuestAccess, ""), None)
+ guest_event = yield get_state(EventTypes.GuestAccess, "")
guest = None
if guest_event:
guest = guest_event.content.get("guest_access", None)
result["guest_can_join"] = guest == "can_join"
- avatar_event = state.get(("m.room.avatar", ""), None)
+ avatar_event = yield get_state("m.room.avatar", "")
if avatar_event:
avatar_url = avatar_event.content.get("url", None)
if avatar_url:
result["avatar_url"] = avatar_url
- result["num_joined_members"] = sum(
- 1 for (event_type, _), ev in state.items()
- if event_type == EventTypes.Member and ev.membership == Membership.JOIN
- )
+ joined_users = yield self.store.get_users_in_room(room_id)
+ result["num_joined_members"] = len(joined_users)
defer.returnValue(result)
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 1f6fde8e8a..48ab5707e1 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -20,6 +20,7 @@ from synapse.api.constants import Membership, EventTypes
from synapse.util import unwrapFirstError
from synapse.util.logcontext import LoggingContext, preserve_fn
from synapse.util.metrics import Measure
+from synapse.util.caches.response_cache import ResponseCache
from synapse.push.clientformat import format_push_rules_for_user
from twisted.internet import defer
@@ -35,6 +36,7 @@ SyncConfig = collections.namedtuple("SyncConfig", [
"user",
"filter_collection",
"is_guest",
+ "request_key",
])
@@ -136,8 +138,8 @@ class SyncHandler(BaseHandler):
super(SyncHandler, self).__init__(hs)
self.event_sources = hs.get_event_sources()
self.clock = hs.get_clock()
+ self.response_cache = ResponseCache()
- @defer.inlineCallbacks
def wait_for_sync_for_user(self, sync_config, since_token=None, timeout=0,
full_state=False):
"""Get the sync for a client if we have new data for it now. Otherwise
@@ -146,7 +148,19 @@ class SyncHandler(BaseHandler):
Returns:
A Deferred SyncResult.
"""
+ result = self.response_cache.get(sync_config.request_key)
+ if not result:
+ result = self.response_cache.set(
+ sync_config.request_key,
+ self._wait_for_sync_for_user(
+ sync_config, since_token, timeout, full_state
+ )
+ )
+ return result
+ @defer.inlineCallbacks
+ def _wait_for_sync_for_user(self, sync_config, since_token, timeout,
+ full_state):
context = LoggingContext.current_context()
if context:
if since_token is None:
diff --git a/synapse/http/server.py b/synapse/http/server.py
index b17b190ee5..b82196fd5e 100644
--- a/synapse/http/server.py
+++ b/synapse/http/server.py
@@ -18,6 +18,7 @@ from synapse.api.errors import (
cs_exception, SynapseError, CodeMessageException, UnrecognizedRequestError, Codes
)
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
+from synapse.util.caches import intern_dict
import synapse.metrics
import synapse.events
@@ -229,11 +230,12 @@ class JsonResource(HttpServer, resource.Resource):
else:
servlet_classname = "%r" % callback
- args = [
- urllib.unquote(u).decode("UTF-8") if u else u for u in m.groups()
- ]
+ kwargs = intern_dict({
+ name: urllib.unquote(value).decode("UTF-8") if value else value
+ for name, value in m.groupdict().items()
+ })
- callback_return = yield callback(request, *args)
+ callback_return = yield callback(request, **kwargs)
if callback_return is not None:
code, response = callback_return
self._send_response(request, code, response)
diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py
index 65ef1b68a3..296c4447ec 100644
--- a/synapse/push/__init__.py
+++ b/synapse/push/__init__.py
@@ -317,7 +317,7 @@ class Pusher(object):
@defer.inlineCallbacks
def _get_badge_count(self):
invites, joins = yield defer.gatherResults([
- self.store.get_invites_for_user(self.user_id),
+ self.store.get_invited_rooms_for_user(self.user_id),
self.store.get_rooms_for_user(self.user_id),
], consumeErrors=True)
diff --git a/synapse/push/baserules.py b/synapse/push/baserules.py
index 86a2998bcc..792af70eb7 100644
--- a/synapse/push/baserules.py
+++ b/synapse/push/baserules.py
@@ -160,7 +160,27 @@ BASE_APPEND_OVRRIDE_RULES = [
'actions': [
'dont_notify',
]
- }
+ },
+ # Will we sometimes want to know about people joining and leaving?
+ # Perhaps: if so, this could be expanded upon. Seems the most usual case
+ # is that we don't though. We add this override rule so that even if
+ # the room rule is set to notify, we don't get notifications about
+ # join/leave/avatar/displayname events.
+ # See also: https://matrix.org/jira/browse/SYN-607
+ {
+ 'rule_id': 'global/override/.m.rule.member_event',
+ 'conditions': [
+ {
+ 'kind': 'event_match',
+ 'key': 'type',
+ 'pattern': 'm.room.member',
+ '_id': '_member',
+ }
+ ],
+ 'actions': [
+ 'dont_notify'
+ ]
+ },
]
@@ -261,25 +281,6 @@ BASE_APPEND_UNDERRIDE_RULES = [
}
]
},
- # This is too simple: https://matrix.org/jira/browse/SYN-607
- # Removing for now
- # {
- # 'rule_id': 'global/underride/.m.rule.member_event',
- # 'conditions': [
- # {
- # 'kind': 'event_match',
- # 'key': 'type',
- # 'pattern': 'm.room.member',
- # '_id': '_member',
- # }
- # ],
- # 'actions': [
- # 'notify', {
- # 'set_tweak': 'highlight',
- # 'value': False
- # }
- # ]
- # },
{
'rule_id': 'global/underride/.m.rule.message',
'conditions': [
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index 87d5061fb0..76d7eb7ce0 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -107,7 +107,9 @@ class BulkPushRuleEvaluator:
users_dict.items(), [event], {event.event_id: current_state}
)
- evaluator = PushRuleEvaluatorForEvent(event, len(self.users_in_room))
+ room_members = yield self.store.get_users_in_room(self.room_id)
+
+ evaluator = PushRuleEvaluatorForEvent(event, len(room_members))
condition_cache = {}
diff --git a/synapse/rest/client/v1/directory.py b/synapse/rest/client/v1/directory.py
index 59a23d6cb6..8ac09419dc 100644
--- a/synapse/rest/client/v1/directory.py
+++ b/synapse/rest/client/v1/directory.py
@@ -30,6 +30,7 @@ logger = logging.getLogger(__name__)
def register_servlets(hs, http_server):
ClientDirectoryServer(hs).register(http_server)
+ ClientDirectoryListServer(hs).register(http_server)
class ClientDirectoryServer(ClientV1RestServlet):
@@ -137,3 +138,44 @@ class ClientDirectoryServer(ClientV1RestServlet):
)
defer.returnValue((200, {}))
+
+
+class ClientDirectoryListServer(ClientV1RestServlet):
+ PATTERNS = client_path_patterns("/directory/list/room/(?P<room_id>[^/]*)$")
+
+ def __init__(self, hs):
+ super(ClientDirectoryListServer, self).__init__(hs)
+ self.store = hs.get_datastore()
+
+ @defer.inlineCallbacks
+ def on_GET(self, request, room_id):
+ room = yield self.store.get_room(room_id)
+ if room is None:
+ raise SynapseError(400, "Unknown room")
+
+ defer.returnValue((200, {
+ "visibility": "public" if room["is_public"] else "private"
+ }))
+
+ @defer.inlineCallbacks
+ def on_PUT(self, request, room_id):
+ requester = yield self.auth.get_user_by_req(request)
+
+ content = parse_json_object_from_request(request)
+ visibility = content.get("visibility", "public")
+
+ yield self.handlers.directory_handler.edit_published_room_list(
+ requester, room_id, visibility,
+ )
+
+ defer.returnValue((200, {}))
+
+ @defer.inlineCallbacks
+ def on_DELETE(self, request, room_id):
+ requester = yield self.auth.get_user_by_req(request)
+
+ yield self.handlers.directory_handler.edit_published_room_list(
+ requester, room_id, "private",
+ )
+
+ defer.returnValue((200, {}))
diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py
index de4a020ad4..c5785d7074 100644
--- a/synapse/rest/client/v2_alpha/sync.py
+++ b/synapse/rest/client/v2_alpha/sync.py
@@ -115,6 +115,8 @@ class SyncRestServlet(RestServlet):
)
)
+ request_key = (user, timeout, since, filter_id, full_state)
+
if filter_id:
if filter_id.startswith('{'):
try:
@@ -134,6 +136,7 @@ class SyncRestServlet(RestServlet):
user=user,
filter_collection=filter,
is_guest=requester.is_guest,
+ request_key=request_key,
)
if since is not None:
diff --git a/synapse/state.py b/synapse/state.py
index b9a1387520..41d32e664a 100644
--- a/synapse/state.py
+++ b/synapse/state.py
@@ -18,6 +18,7 @@ from twisted.internet import defer
from synapse.util.logutils import log_function
from synapse.util.caches.expiringcache import ExpiringCache
+from synapse.util.metrics import Measure
from synapse.api.constants import EventTypes
from synapse.api.errors import AuthError
from synapse.api.auth import AuthEventTypes
@@ -27,6 +28,7 @@ from collections import namedtuple
import logging
import hashlib
+import os
logger = logging.getLogger(__name__)
@@ -34,8 +36,11 @@ logger = logging.getLogger(__name__)
KeyStateTuple = namedtuple("KeyStateTuple", ("context", "type", "state_key"))
-SIZE_OF_CACHE = 1000
-EVICTION_TIMEOUT_SECONDS = 20
+CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1))
+
+
+SIZE_OF_CACHE = int(1000 * CACHE_SIZE_FACTOR)
+EVICTION_TIMEOUT_SECONDS = 60 * 60
class _StateCacheEntry(object):
@@ -85,16 +90,8 @@ class StateHandler(object):
"""
event_ids = yield self.store.get_latest_event_ids_in_room(room_id)
- cache = None
- if self._state_cache is not None:
- cache = self._state_cache.get(frozenset(event_ids), None)
-
- if cache:
- cache.ts = self.clock.time_msec()
- state = cache.state
- else:
- res = yield self.resolve_state_groups(room_id, event_ids)
- state = res[1]
+ res = yield self.resolve_state_groups(room_id, event_ids)
+ state = res[1]
if event_type:
defer.returnValue(state.get((event_type, state_key)))
@@ -186,20 +183,6 @@ class StateHandler(object):
"""
logger.debug("resolve_state_groups event_ids %s", event_ids)
- if self._state_cache is not None:
- cache = self._state_cache.get(frozenset(event_ids), None)
- if cache and cache.state_group:
- cache.ts = self.clock.time_msec()
- prev_state = cache.state.get((event_type, state_key), None)
- if prev_state:
- prev_state = prev_state.event_id
- prev_states = [prev_state]
- else:
- prev_states = []
- defer.returnValue(
- (cache.state_group, cache.state, prev_states)
- )
-
state_groups = yield self.store.get_state_groups(
room_id, event_ids
)
@@ -209,7 +192,7 @@ class StateHandler(object):
state_groups.keys()
)
- group_names = set(state_groups.keys())
+ group_names = frozenset(state_groups.keys())
if len(group_names) == 1:
name, state_list = state_groups.items().pop()
state = {
@@ -223,16 +206,25 @@ class StateHandler(object):
else:
prev_states = []
- if self._state_cache is not None:
- cache = _StateCacheEntry(
- state=state,
- state_group=name,
- ts=self.clock.time_msec()
- )
+ defer.returnValue((name, state, prev_states))
- self._state_cache[frozenset(event_ids)] = cache
+ if self._state_cache is not None:
+ cache = self._state_cache.get(group_names, None)
+ if cache and cache.state_group:
+ cache.ts = self.clock.time_msec()
- defer.returnValue((name, state, prev_states))
+ event_dict = yield self.store.get_events(cache.state.values())
+ state = {(e.type, e.state_key): e for e in event_dict.values()}
+
+ prev_state = state.get((event_type, state_key), None)
+ if prev_state:
+ prev_state = prev_state.event_id
+ prev_states = [prev_state]
+ else:
+ prev_states = []
+ defer.returnValue(
+ (cache.state_group, state, prev_states)
+ )
new_state, prev_states = self._resolve_events(
state_groups.values(), event_type, state_key
@@ -240,12 +232,12 @@ class StateHandler(object):
if self._state_cache is not None:
cache = _StateCacheEntry(
- state=new_state,
+ state={key: event.event_id for key, event in new_state.items()},
state_group=None,
ts=self.clock.time_msec()
)
- self._state_cache[frozenset(event_ids)] = cache
+ self._state_cache[group_names] = cache
defer.returnValue((None, new_state, prev_states))
@@ -263,48 +255,49 @@ class StateHandler(object):
from (type, state_key) to event. prev_states is a list of event_ids.
:rtype: (dict[(str, str), synapse.events.FrozenEvent], list[str])
"""
- state = {}
- for st in state_sets:
- for e in st:
- state.setdefault(
- (e.type, e.state_key),
- {}
- )[e.event_id] = e
-
- unconflicted_state = {
- k: v.values()[0] for k, v in state.items()
- if len(v.values()) == 1
- }
-
- conflicted_state = {
- k: v.values()
- for k, v in state.items()
- if len(v.values()) > 1
- }
+ with Measure(self.clock, "state._resolve_events"):
+ state = {}
+ for st in state_sets:
+ for e in st:
+ state.setdefault(
+ (e.type, e.state_key),
+ {}
+ )[e.event_id] = e
+
+ unconflicted_state = {
+ k: v.values()[0] for k, v in state.items()
+ if len(v.values()) == 1
+ }
- if event_type:
- prev_states_events = conflicted_state.get(
- (event_type, state_key), []
- )
- prev_states = [s.event_id for s in prev_states_events]
- else:
- prev_states = []
+ conflicted_state = {
+ k: v.values()
+ for k, v in state.items()
+ if len(v.values()) > 1
+ }
+
+ if event_type:
+ prev_states_events = conflicted_state.get(
+ (event_type, state_key), []
+ )
+ prev_states = [s.event_id for s in prev_states_events]
+ else:
+ prev_states = []
- auth_events = {
- k: e for k, e in unconflicted_state.items()
- if k[0] in AuthEventTypes
- }
+ auth_events = {
+ k: e for k, e in unconflicted_state.items()
+ if k[0] in AuthEventTypes
+ }
- try:
- resolved_state = self._resolve_state_events(
- conflicted_state, auth_events
- )
- except:
- logger.exception("Failed to resolve state")
- raise
+ try:
+ resolved_state = self._resolve_state_events(
+ conflicted_state, auth_events
+ )
+ except:
+ logger.exception("Failed to resolve state")
+ raise
- new_state = unconflicted_state
- new_state.update(resolved_state)
+ new_state = unconflicted_state
+ new_state.update(resolved_state)
return new_state, prev_states
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 7dc67ecd57..b75b79df36 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -18,6 +18,7 @@ from synapse.api.errors import StoreError
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
from synapse.util.caches.dictionary_cache import DictionaryCache
from synapse.util.caches.descriptors import Cache
+from synapse.util.caches import intern_dict
import synapse.metrics
@@ -26,6 +27,10 @@ from twisted.internet import defer
import sys
import time
import threading
+import os
+
+
+CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1))
logger = logging.getLogger(__name__)
@@ -163,7 +168,9 @@ class SQLBaseStore(object):
self._get_event_cache = Cache("*getEvent*", keylen=3, lru=True,
max_entries=hs.config.event_cache_size)
- self._state_group_cache = DictionaryCache("*stateGroupCache*", 2000)
+ self._state_group_cache = DictionaryCache(
+ "*stateGroupCache*", 2000 * CACHE_SIZE_FACTOR
+ )
self._event_fetch_lock = threading.Condition()
self._event_fetch_list = []
@@ -344,7 +351,7 @@ class SQLBaseStore(object):
"""
col_headers = list(column[0] for column in cursor.description)
results = list(
- dict(zip(col_headers, row)) for row in cursor.fetchall()
+ intern_dict(dict(zip(col_headers, row))) for row in cursor.fetchall()
)
return results
diff --git a/synapse/storage/directory.py b/synapse/storage/directory.py
index 012a0b414a..ef231a04dc 100644
--- a/synapse/storage/directory.py
+++ b/synapse/storage/directory.py
@@ -155,7 +155,7 @@ class DirectoryStore(SQLBaseStore):
return room_id
- @cached()
+ @cached(max_entries=5000)
def get_aliases_for_room(self, room_id):
return self._simple_select_onecol(
"room_aliases",
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index 5820539a92..dc5830450a 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -49,7 +49,7 @@ class EventPushActionsStore(SQLBaseStore):
)
self._simple_insert_many_txn(txn, "event_push_actions", values)
- @cachedInlineCallbacks(num_args=3, lru=True, tree=True)
+ @cachedInlineCallbacks(num_args=3, lru=True, tree=True, max_entries=5000)
def get_unread_event_push_actions_by_room_for_user(
self, room_id, user_id, last_read_event_id
):
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 285c586cfe..5233430028 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -101,30 +101,16 @@ class EventsStore(SQLBaseStore):
@defer.inlineCallbacks
@log_function
- def persist_event(self, event, context, backfilled=False,
+ def persist_event(self, event, context,
is_new_state=True, current_state=None):
- stream_ordering = None
- if backfilled:
- self.min_stream_token -= 1
- stream_ordering = self.min_stream_token
-
- if stream_ordering is None:
- stream_ordering_manager = self._stream_id_gen.get_next()
- else:
- @contextmanager
- def stream_ordering_manager():
- yield stream_ordering
- stream_ordering_manager = stream_ordering_manager()
-
try:
- with stream_ordering_manager as stream_ordering:
+ with self._stream_id_gen.get_next() as stream_ordering:
event.internal_metadata.stream_ordering = stream_ordering
yield self.runInteraction(
"persist_event",
self._persist_event_txn,
event=event,
context=context,
- backfilled=backfilled,
is_new_state=is_new_state,
current_state=current_state,
)
@@ -165,13 +151,38 @@ class EventsStore(SQLBaseStore):
defer.returnValue(events[0] if events else None)
+ @defer.inlineCallbacks
+ def get_events(self, event_ids, check_redacted=True,
+ get_prev_content=False, allow_rejected=False):
+ """Get events from the database
+
+ Args:
+ event_ids (list): The event_ids of the events to fetch
+ check_redacted (bool): If True, check if event has been redacted
+ and redact it.
+ get_prev_content (bool): If True and event is a state event,
+ include the previous states content in the unsigned field.
+ allow_rejected (bool): If True return rejected events.
+
+ Returns:
+ Deferred : Dict from event_id to event.
+ """
+ events = yield self._get_events(
+ event_ids,
+ check_redacted=check_redacted,
+ get_prev_content=get_prev_content,
+ allow_rejected=allow_rejected,
+ )
+
+ defer.returnValue({e.event_id: e for e in events})
+
@log_function
- def _persist_event_txn(self, txn, event, context, backfilled,
+ def _persist_event_txn(self, txn, event, context,
is_new_state=True, current_state=None):
# We purposefully do this first since if we include a `current_state`
# key, we *want* to update the `current_state_events` table
if current_state:
- txn.call_after(self.get_current_state_for_key.invalidate_all)
+ txn.call_after(self._get_current_state_for_key.invalidate_all)
txn.call_after(self.get_rooms_for_user.invalidate_all)
txn.call_after(self.get_users_in_room.invalidate, (event.room_id,))
txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,))
@@ -198,7 +209,7 @@ class EventsStore(SQLBaseStore):
return self._persist_events_txn(
txn,
[(event, context)],
- backfilled=backfilled,
+ backfilled=False,
is_new_state=is_new_state,
)
@@ -455,7 +466,7 @@ class EventsStore(SQLBaseStore):
for event, _ in state_events_and_contexts:
if not context.rejected:
txn.call_after(
- self.get_current_state_for_key.invalidate,
+ self._get_current_state_for_key.invalidate,
(event.room_id, event.type, event.state_key,)
)
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index dbc074d6b5..6b9d848eaa 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -62,18 +62,17 @@ class ReceiptsStore(SQLBaseStore):
@cachedInlineCallbacks(num_args=2)
def get_receipts_for_user(self, user_id, receipt_type):
- def f(txn):
- sql = (
- "SELECT room_id,event_id "
- "FROM receipts_linearized "
- "WHERE user_id = ? AND receipt_type = ? "
- )
- txn.execute(sql, (user_id, receipt_type))
- return txn.fetchall()
+ rows = yield self._simple_select_list(
+ table="receipts_linearized",
+ keyvalues={
+ "user_id": user_id,
+ "receipt_type": receipt_type,
+ },
+ retcols=("room_id", "event_id"),
+ desc="get_receipts_for_user",
+ )
- defer.returnValue(dict(
- (yield self.runInteraction("get_receipts_for_user", f))
- ))
+ defer.returnValue({row["room_id"]: row["event_id"] for row in rows})
@defer.inlineCallbacks
def get_linearized_receipts_for_rooms(self, room_ids, to_key, from_key=None):
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index 46ab38a313..9be977f387 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -77,6 +77,14 @@ class RoomStore(SQLBaseStore):
allow_none=True,
)
+ def set_room_is_public(self, room_id, is_public):
+ return self._simple_update_one(
+ table="rooms",
+ keyvalues={"room_id": room_id},
+ updatevalues={"is_public": is_public},
+ desc="set_room_is_public",
+ )
+
def get_public_room_ids(self):
return self._simple_select_onecol(
table="rooms",
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 0cd89260f2..430b49c12e 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -115,19 +115,17 @@ class RoomMemberStore(SQLBaseStore):
).addCallback(self._get_events)
@cached()
- def get_invites_for_user(self, user_id):
- """ Get all the invite events for a user
+ def get_invited_rooms_for_user(self, user_id):
+ """ Get all the rooms the user is invited to
Args:
user_id (str): The user ID.
Returns:
- A deferred list of event objects.
+ A deferred list of RoomsForUser.
"""
return self.get_rooms_for_user_where_membership_is(
user_id, [Membership.INVITE]
- ).addCallback(lambda invites: self._get_events([
- invite.event_id for invite in invites
- ]))
+ )
def get_leave_and_ban_events_for_user(self, user_id):
""" Get all the leave events for a user
diff --git a/synapse/storage/schema/delta/30/public_rooms.sql b/synapse/storage/schema/delta/30/public_rooms.sql
new file mode 100644
index 0000000000..f09db4faa6
--- /dev/null
+++ b/synapse/storage/schema/delta/30/public_rooms.sql
@@ -0,0 +1,23 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+/* This release removes the restriction that published rooms must have an alias,
+ * so we go back and ensure the only 'public' rooms are ones with an alias.
+ * We use (1 = 0) and (1 = 1) so that it works in both postgres and sqlite
+ */
+UPDATE rooms SET is_public = (1 = 0) WHERE is_public = (1 = 1) AND room_id not in (
+ SELECT room_id FROM room_aliases
+);
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 8ed8a21b0a..02cefdff26 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -14,9 +14,8 @@
# limitations under the License.
from ._base import SQLBaseStore
-from synapse.util.caches.descriptors import (
- cached, cachedInlineCallbacks, cachedList
-)
+from synapse.util.caches.descriptors import cached, cachedList
+from synapse.util.caches import intern_string
from twisted.internet import defer
@@ -155,8 +154,14 @@ class StateStore(SQLBaseStore):
events = yield self._get_events(event_ids, get_prev_content=False)
defer.returnValue(events)
- @cachedInlineCallbacks(num_args=3)
+ @defer.inlineCallbacks
def get_current_state_for_key(self, room_id, event_type, state_key):
+ event_ids = yield self._get_current_state_for_key(room_id, event_type, state_key)
+ events = yield self._get_events(event_ids, get_prev_content=False)
+ defer.returnValue(events)
+
+ @cached(num_args=3)
+ def _get_current_state_for_key(self, room_id, event_type, state_key):
def f(txn):
sql = (
"SELECT event_id FROM current_state_events"
@@ -167,12 +172,10 @@ class StateStore(SQLBaseStore):
txn.execute(sql, args)
results = txn.fetchall()
return [r[0] for r in results]
- event_ids = yield self.runInteraction("get_current_state_for_key", f)
- events = yield self._get_events(event_ids, get_prev_content=False)
- defer.returnValue(events)
+ return self.runInteraction("get_current_state_for_key", f)
def _get_state_groups_from_groups(self, groups, types):
- """Returns dictionary state_group -> state event ids
+ """Returns dictionary state_group -> (dict of (type, state_key) -> event id)
"""
def f(txn, groups):
if types is not None:
@@ -183,7 +186,8 @@ class StateStore(SQLBaseStore):
where_clause = ""
sql = (
- "SELECT state_group, event_id FROM state_groups_state WHERE"
+ "SELECT state_group, event_id, type, state_key"
+ " FROM state_groups_state WHERE"
" state_group IN (%s) %s" % (
",".join("?" for _ in groups),
where_clause,
@@ -199,7 +203,8 @@ class StateStore(SQLBaseStore):
results = {}
for row in rows:
- results.setdefault(row["state_group"], []).append(row["event_id"])
+ key = (row["type"], row["state_key"])
+ results.setdefault(row["state_group"], {})[key] = row["event_id"]
return results
chunks = [groups[i:i + 100] for i in xrange(0, len(groups), 100)]
@@ -296,7 +301,7 @@ class StateStore(SQLBaseStore):
where a `state_key` of `None` matches all state_keys for the
`type`.
"""
- is_all, state_dict = self._state_group_cache.get(group)
+ is_all, state_dict_ids = self._state_group_cache.get(group)
type_to_key = {}
missing_types = set()
@@ -308,7 +313,7 @@ class StateStore(SQLBaseStore):
if type_to_key.get(typ, object()) is not None:
type_to_key.setdefault(typ, set()).add(state_key)
- if (typ, state_key) not in state_dict:
+ if (typ, state_key) not in state_dict_ids:
missing_types.add((typ, state_key))
sentinel = object()
@@ -326,7 +331,7 @@ class StateStore(SQLBaseStore):
got_all = not (missing_types or types is None)
return {
- k: v for k, v in state_dict.items()
+ k: v for k, v in state_dict_ids.items()
if include(k[0], k[1])
}, missing_types, got_all
@@ -340,8 +345,9 @@ class StateStore(SQLBaseStore):
Args:
group: The state group to lookup
"""
- is_all, state_dict = self._state_group_cache.get(group)
- return state_dict, is_all
+ is_all, state_dict_ids = self._state_group_cache.get(group)
+
+ return state_dict_ids, is_all
@defer.inlineCallbacks
def _get_state_for_groups(self, groups, types=None):
@@ -354,84 +360,72 @@ class StateStore(SQLBaseStore):
missing_groups = []
if types is not None:
for group in set(groups):
- state_dict, missing_types, got_all = self._get_some_state_from_cache(
+ state_dict_ids, missing_types, got_all = self._get_some_state_from_cache(
group, types
)
- results[group] = state_dict
+ results[group] = state_dict_ids
if not got_all:
missing_groups.append(group)
else:
for group in set(groups):
- state_dict, got_all = self._get_all_state_from_cache(
+ state_dict_ids, got_all = self._get_all_state_from_cache(
group
)
- results[group] = state_dict
+
+ results[group] = state_dict_ids
if not got_all:
missing_groups.append(group)
- if not missing_groups:
- defer.returnValue({
- group: {
- type_tuple: event
- for type_tuple, event in state.items()
- if event
- }
- for group, state in results.items()
- })
+ if missing_groups:
+ # Okay, so we have some missing_types, lets fetch them.
+ cache_seq_num = self._state_group_cache.sequence
- # Okay, so we have some missing_types, lets fetch them.
- cache_seq_num = self._state_group_cache.sequence
+ group_to_state_dict = yield self._get_state_groups_from_groups(
+ missing_groups, types
+ )
- group_state_dict = yield self._get_state_groups_from_groups(
- missing_groups, types
- )
+ # Now we want to update the cache with all the things we fetched
+ # from the database.
+ for group, group_state_dict in group_to_state_dict.items():
+ if types:
+ # We delibrately put key -> None mappings into the cache to
+ # cache absence of the key, on the assumption that if we've
+ # explicitly asked for some types then we will probably ask
+ # for them again.
+ state_dict = {
+ (intern_string(etype), intern_string(state_key)): None
+ for (etype, state_key) in types
+ }
+ state_dict.update(results[group])
+ results[group] = state_dict
+ else:
+ state_dict = results[group]
+
+ state_dict.update(group_state_dict)
+
+ self._state_group_cache.update(
+ cache_seq_num,
+ key=group,
+ value=state_dict,
+ full=(types is None),
+ )
state_events = yield self._get_events(
- [e_id for l in group_state_dict.values() for e_id in l],
+ [ev_id for sd in results.values() for ev_id in sd.values()],
get_prev_content=False
)
state_events = {e.event_id: e for e in state_events}
- # Now we want to update the cache with all the things we fetched
- # from the database.
- for group, state_ids in group_state_dict.items():
- if types:
- # We delibrately put key -> None mappings into the cache to
- # cache absence of the key, on the assumption that if we've
- # explicitly asked for some types then we will probably ask
- # for them again.
- state_dict = {key: None for key in types}
- state_dict.update(results[group])
- results[group] = state_dict
- else:
- state_dict = results[group]
-
- for event_id in state_ids:
- try:
- state_event = state_events[event_id]
- state_dict[(state_event.type, state_event.state_key)] = state_event
- except KeyError:
- # Hmm. So we do don't have that state event? Interesting.
- logger.warn(
- "Can't find state event %r for state group %r",
- event_id, group,
- )
-
- self._state_group_cache.update(
- cache_seq_num,
- key=group,
- value=state_dict,
- full=(types is None),
- )
-
# Remove all the entries with None values. The None values were just
# used for bookkeeping in the cache.
for group, state_dict in results.items():
results[group] = {
- key: event for key, event in state_dict.items() if event
+ key: state_events[event_id]
+ for key, event_id in state_dict.items()
+ if event_id and event_id in state_events
}
defer.returnValue(results)
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 7f4a827528..cf84938be5 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -36,7 +36,7 @@ what sort order was used:
from twisted.internet import defer
from ._base import SQLBaseStore
-from synapse.util.caches.descriptors import cachedInlineCallbacks
+from synapse.util.caches.descriptors import cached
from synapse.api.constants import EventTypes
from synapse.types import RoomStreamToken
from synapse.util.logcontext import preserve_fn
@@ -465,9 +465,25 @@ class StreamStore(SQLBaseStore):
defer.returnValue((events, token))
- @cachedInlineCallbacks(num_args=4)
+ @defer.inlineCallbacks
def get_recent_events_for_room(self, room_id, limit, end_token, from_token=None):
+ rows, token = yield self.get_recent_event_ids_for_room(
+ room_id, limit, end_token, from_token
+ )
+
+ logger.debug("stream before")
+ events = yield self._get_events(
+ [r["event_id"] for r in rows],
+ get_prev_content=True
+ )
+ logger.debug("stream after")
+
+ self._set_before_and_after(events, rows)
+
+ defer.returnValue((events, token))
+ @cached(num_args=4)
+ def get_recent_event_ids_for_room(self, room_id, limit, end_token, from_token=None):
end_token = RoomStreamToken.parse_stream_token(end_token)
if from_token is None:
@@ -517,21 +533,10 @@ class StreamStore(SQLBaseStore):
return rows, token
- rows, token = yield self.runInteraction(
+ return self.runInteraction(
"get_recent_events_for_room", get_recent_events_for_room_txn
)
- logger.debug("stream before")
- events = yield self._get_events(
- [r["event_id"] for r in rows],
- get_prev_content=True
- )
- logger.debug("stream after")
-
- self._set_before_and_after(events, rows)
-
- defer.returnValue((events, token))
-
@defer.inlineCallbacks
def get_room_events_max_id(self, direction='f'):
token = yield self._stream_id_gen.get_max_token()
diff --git a/synapse/util/caches/__init__.py b/synapse/util/caches/__init__.py
index 1a14904194..d53569ca49 100644
--- a/synapse/util/caches/__init__.py
+++ b/synapse/util/caches/__init__.py
@@ -14,6 +14,10 @@
# limitations under the License.
import synapse.metrics
+from lrucache import LruCache
+import os
+
+CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1))
DEBUG_CACHES = False
@@ -25,3 +29,56 @@ cache_counter = metrics.register_cache(
lambda: {(name,): len(caches_by_name[name]) for name in caches_by_name.keys()},
labels=["name"],
)
+
+_string_cache = LruCache(int(5000 * CACHE_SIZE_FACTOR))
+caches_by_name["string_cache"] = _string_cache
+
+
+KNOWN_KEYS = {
+ key: key for key in
+ (
+ "auth_events",
+ "content",
+ "depth",
+ "event_id",
+ "hashes",
+ "origin",
+ "origin_server_ts",
+ "prev_events",
+ "room_id",
+ "sender",
+ "signatures",
+ "state_key",
+ "type",
+ "unsigned",
+ "user_id",
+ )
+}
+
+
+def intern_string(string):
+ """Takes a (potentially) unicode string and interns using custom cache
+ """
+ return _string_cache.setdefault(string, string)
+
+
+def intern_dict(dictionary):
+ """Takes a dictionary and interns well known keys and their values
+ """
+ return {
+ KNOWN_KEYS.get(key, key): _intern_known_values(key, value)
+ for key, value in dictionary.items()
+ }
+
+
+def _intern_known_values(key, value):
+ intern_str_keys = ("event_id", "room_id")
+ intern_unicode_keys = ("sender", "user_id", "type", "state_key")
+
+ if key in intern_str_keys:
+ return intern(value.encode('ascii'))
+
+ if key in intern_unicode_keys:
+ return intern_string(value)
+
+ return value
diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py
index f7423f2fab..f9df445a8d 100644
--- a/synapse/util/caches/lrucache.py
+++ b/synapse/util/caches/lrucache.py
@@ -29,6 +29,16 @@ def enumerate_leaves(node, depth):
yield m
+class _Node(object):
+ __slots__ = ["prev_node", "next_node", "key", "value"]
+
+ def __init__(self, prev_node, next_node, key, value):
+ self.prev_node = prev_node
+ self.next_node = next_node
+ self.key = key
+ self.value = value
+
+
class LruCache(object):
"""
Least-recently-used cache.
@@ -38,10 +48,9 @@ class LruCache(object):
def __init__(self, max_size, keylen=1, cache_type=dict):
cache = cache_type()
self.cache = cache # Used for introspection.
- list_root = []
- list_root[:] = [list_root, list_root, None, None]
-
- PREV, NEXT, KEY, VALUE = 0, 1, 2, 3
+ list_root = _Node(None, None, None, None)
+ list_root.next_node = list_root
+ list_root.prev_node = list_root
lock = threading.Lock()
@@ -55,36 +64,36 @@ class LruCache(object):
def add_node(key, value):
prev_node = list_root
- next_node = prev_node[NEXT]
- node = [prev_node, next_node, key, value]
- prev_node[NEXT] = node
- next_node[PREV] = node
+ next_node = prev_node.next_node
+ node = _Node(prev_node, next_node, key, value)
+ prev_node.next_node = node
+ next_node.prev_node = node
cache[key] = node
def move_node_to_front(node):
- prev_node = node[PREV]
- next_node = node[NEXT]
- prev_node[NEXT] = next_node
- next_node[PREV] = prev_node
+ prev_node = node.prev_node
+ next_node = node.next_node
+ prev_node.next_node = next_node
+ next_node.prev_node = prev_node
prev_node = list_root
- next_node = prev_node[NEXT]
- node[PREV] = prev_node
- node[NEXT] = next_node
- prev_node[NEXT] = node
- next_node[PREV] = node
+ next_node = prev_node.next_node
+ node.prev_node = prev_node
+ node.next_node = next_node
+ prev_node.next_node = node
+ next_node.prev_node = node
def delete_node(node):
- prev_node = node[PREV]
- next_node = node[NEXT]
- prev_node[NEXT] = next_node
- next_node[PREV] = prev_node
+ prev_node = node.prev_node
+ next_node = node.next_node
+ prev_node.next_node = next_node
+ next_node.prev_node = prev_node
@synchronized
def cache_get(key, default=None):
node = cache.get(key, None)
if node is not None:
move_node_to_front(node)
- return node[VALUE]
+ return node.value
else:
return default
@@ -93,25 +102,25 @@ class LruCache(object):
node = cache.get(key, None)
if node is not None:
move_node_to_front(node)
- node[VALUE] = value
+ node.value = value
else:
add_node(key, value)
if len(cache) > max_size:
- todelete = list_root[PREV]
+ todelete = list_root.prev_node
delete_node(todelete)
- cache.pop(todelete[KEY], None)
+ cache.pop(todelete.key, None)
@synchronized
def cache_set_default(key, value):
node = cache.get(key, None)
if node is not None:
- return node[VALUE]
+ return node.value
else:
add_node(key, value)
if len(cache) > max_size:
- todelete = list_root[PREV]
+ todelete = list_root.prev_node
delete_node(todelete)
- cache.pop(todelete[KEY], None)
+ cache.pop(todelete.key, None)
return value
@synchronized
@@ -119,8 +128,8 @@ class LruCache(object):
node = cache.get(key, None)
if node:
delete_node(node)
- cache.pop(node[KEY], None)
- return node[VALUE]
+ cache.pop(node.key, None)
+ return node.value
else:
return default
@@ -137,8 +146,8 @@ class LruCache(object):
@synchronized
def cache_clear():
- list_root[NEXT] = list_root
- list_root[PREV] = list_root
+ list_root.next_node = list_root
+ list_root.prev_node = list_root
cache.clear()
@synchronized
diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py
new file mode 100644
index 0000000000..be310ba320
--- /dev/null
+++ b/synapse/util/caches/response_cache.py
@@ -0,0 +1,46 @@
+# -*- coding: utf-8 -*-
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from synapse.util.async import ObservableDeferred
+
+
+class ResponseCache(object):
+ """
+ This caches a deferred response. Until the deferred completes it will be
+ returned from the cache. This means that if the client retries the request
+ while the response is still being computed, that original response will be
+ used rather than trying to compute a new response.
+ """
+
+ def __init__(self):
+ self.pending_result_cache = {} # Requests that haven't finished yet.
+
+ def get(self, key):
+ result = self.pending_result_cache.get(key)
+ if result is not None:
+ return result.observe()
+ else:
+ return None
+
+ def set(self, key, deferred):
+ result = ObservableDeferred(deferred)
+ self.pending_result_cache[key] = result
+
+ def remove(r):
+ self.pending_result_cache.pop(key, None)
+ return r
+
+ result.addBoth(remove)
+ return result.observe()
|