diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index cb710fe796..3465a787ab 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -38,7 +38,10 @@ class ProfileHandler(BaseHandler):
self.user_directory_handler = hs.get_user_directory_handler()
- self.clock.looping_call(self._update_remote_profile_cache, self.PROFILE_UPDATE_MS)
+ if hs.config.worker_app is None:
+ self.clock.looping_call(
+ self._update_remote_profile_cache, self.PROFILE_UPDATE_MS,
+ )
@defer.inlineCallbacks
def get_profile(self, user_id):
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 1d7e6997b9..9977be8831 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -370,7 +370,7 @@ class RoomMemberHandler(object):
content["kind"] = "guest"
ret = yield self._remote_join(
- remote_room_hosts, room_id, target, content
+ requester, remote_room_hosts, room_id, target, content
)
defer.returnValue(ret)
@@ -392,7 +392,7 @@ class RoomMemberHandler(object):
# send the rejection to the inviter's HS.
remote_room_hosts = remote_room_hosts + [inviter.domain]
res = yield self._remote_reject_invite(
- remote_room_hosts, room_id, target,
+ requester, remote_room_hosts, room_id, target,
)
defer.returnValue(res)
@@ -849,7 +849,7 @@ class RoomMemberMasterHandler(RoomMemberHandler):
self.distributor.declare("user_left_room")
@defer.inlineCallbacks
- def _remote_join(self, remote_room_hosts, room_id, user, content):
+ def _remote_join(self, requester, remote_room_hosts, room_id, user, content):
"""Implements RoomMemberHandler._remote_join
"""
if len(remote_room_hosts) == 0:
@@ -868,7 +868,7 @@ class RoomMemberMasterHandler(RoomMemberHandler):
yield self._user_joined_room(user, room_id)
@defer.inlineCallbacks
- def _remote_reject_invite(self, remote_room_hosts, room_id, target):
+ def _remote_reject_invite(self, requester, remote_room_hosts, room_id, target):
"""Implements RoomMemberHandler._remote_reject_invite
"""
fed_handler = self.federation_handler
diff --git a/synapse/replication/slave/storage/profile.py b/synapse/replication/slave/storage/profile.py
new file mode 100644
index 0000000000..46c28d4171
--- /dev/null
+++ b/synapse/replication/slave/storage/profile.py
@@ -0,0 +1,21 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from synapse.replication.slave.storage._base import BaseSlavedStore
+from synapse.storage.profile import ProfileWorkerStore
+
+
+class SlavedProfileStore(ProfileWorkerStore, BaseSlavedStore):
+ pass
diff --git a/synapse/storage/profile.py b/synapse/storage/profile.py
index ec02e73bc2..8612bd5ecc 100644
--- a/synapse/storage/profile.py
+++ b/synapse/storage/profile.py
@@ -21,14 +21,7 @@ from synapse.api.errors import StoreError
from ._base import SQLBaseStore
-class ProfileStore(SQLBaseStore):
- def create_profile(self, user_localpart):
- return self._simple_insert(
- table="profiles",
- values={"user_id": user_localpart},
- desc="create_profile",
- )
-
+class ProfileWorkerStore(SQLBaseStore):
@defer.inlineCallbacks
def get_profileinfo(self, user_localpart):
try:
@@ -61,14 +54,6 @@ class ProfileStore(SQLBaseStore):
desc="get_profile_displayname",
)
- def set_profile_displayname(self, user_localpart, new_displayname):
- return self._simple_update_one(
- table="profiles",
- keyvalues={"user_id": user_localpart},
- updatevalues={"displayname": new_displayname},
- desc="set_profile_displayname",
- )
-
def get_profile_avatar_url(self, user_localpart):
return self._simple_select_one_onecol(
table="profiles",
@@ -77,14 +62,6 @@ class ProfileStore(SQLBaseStore):
desc="get_profile_avatar_url",
)
- def set_profile_avatar_url(self, user_localpart, new_avatar_url):
- return self._simple_update_one(
- table="profiles",
- keyvalues={"user_id": user_localpart},
- updatevalues={"avatar_url": new_avatar_url},
- desc="set_profile_avatar_url",
- )
-
def get_from_remote_profile_cache(self, user_id):
return self._simple_select_one(
table="remote_profile_cache",
@@ -94,6 +71,31 @@ class ProfileStore(SQLBaseStore):
desc="get_from_remote_profile_cache",
)
+
+class ProfileStore(ProfileWorkerStore):
+ def create_profile(self, user_localpart):
+ return self._simple_insert(
+ table="profiles",
+ values={"user_id": user_localpart},
+ desc="create_profile",
+ )
+
+ def set_profile_displayname(self, user_localpart, new_displayname):
+ return self._simple_update_one(
+ table="profiles",
+ keyvalues={"user_id": user_localpart},
+ updatevalues={"displayname": new_displayname},
+ desc="set_profile_displayname",
+ )
+
+ def set_profile_avatar_url(self, user_localpart, new_avatar_url):
+ return self._simple_update_one(
+ table="profiles",
+ keyvalues={"user_id": user_localpart},
+ updatevalues={"avatar_url": new_avatar_url},
+ desc="set_profile_avatar_url",
+ )
+
def add_remote_profile_cache(self, user_id, displayname, avatar_url):
"""Ensure we are caching the remote user's profiles.
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index 7f2c08d7a6..34ed84ea22 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -157,6 +157,18 @@ class RoomWorkerStore(SQLBaseStore):
"get_public_room_changes", get_public_room_changes_txn
)
+ @cached(max_entries=10000)
+ def is_room_blocked(self, room_id):
+ return self._simple_select_one_onecol(
+ table="blocked_rooms",
+ keyvalues={
+ "room_id": room_id,
+ },
+ retcol="1",
+ allow_none=True,
+ desc="is_room_blocked",
+ )
+
class RoomStore(RoomWorkerStore, SearchStore):
@@ -485,18 +497,6 @@ class RoomStore(RoomWorkerStore, SearchStore):
else:
defer.returnValue(None)
- @cached(max_entries=10000)
- def is_room_blocked(self, room_id):
- return self._simple_select_one_onecol(
- table="blocked_rooms",
- keyvalues={
- "room_id": room_id,
- },
- retcol="1",
- allow_none=True,
- desc="is_room_blocked",
- )
-
@defer.inlineCallbacks
def block_room(self, room_id, user_id):
yield self._simple_insert(
@@ -507,7 +507,11 @@ class RoomStore(RoomWorkerStore, SearchStore):
},
desc="block_room",
)
- self.is_room_blocked.invalidate((room_id,))
+ yield self.runInteraction(
+ "block_room_invalidation",
+ self._invalidate_cache_and_stream,
+ self.is_room_blocked, (room_id,),
+ )
def get_media_mxcs_in_room(self, room_id):
"""Retrieves all the local and remote media MXC URIs in a given room
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 2b325e1c1f..ffa4246031 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -240,6 +240,9 @@ class StateGroupWorkerStore(SQLBaseStore):
(
"AND type = ? AND state_key = ?",
(etype, state_key)
+ ) if state_key is not None else (
+ "AND type = ?",
+ (etype,)
)
for etype, state_key in types
]
@@ -259,10 +262,19 @@ class StateGroupWorkerStore(SQLBaseStore):
key = (typ, state_key)
results[group][key] = event_id
else:
+ where_args = []
+ where_clauses = []
+ wildcard_types = False
if types is not None:
- where_clause = "AND (%s)" % (
- " OR ".join(["(type = ? AND state_key = ?)"] * len(types)),
- )
+ for typ in types:
+ if typ[1] is None:
+ where_clauses.append("(type = ?)")
+ where_args.extend(typ[0])
+ wildcard_types = True
+ else:
+ where_clauses.append("(type = ? AND state_key = ?)")
+ where_args.extend([typ[0], typ[1]])
+ where_clause = "AND (%s)" % (" OR ".join(where_clauses))
else:
where_clause = ""
@@ -279,7 +291,7 @@ class StateGroupWorkerStore(SQLBaseStore):
# after we finish deduping state, which requires this func)
args = [next_group]
if types:
- args.extend(i for typ in types for i in typ)
+ args.extend(where_args)
txn.execute(
"SELECT type, state_key, event_id FROM state_groups_state"
@@ -292,9 +304,17 @@ class StateGroupWorkerStore(SQLBaseStore):
if (typ, state_key) not in results[group]
)
- # If the lengths match then we must have all the types,
- # so no need to go walk further down the tree.
- if types is not None and len(results[group]) == len(types):
+ # If the number of entries in the (type,state_key)->event_id dict
+ # matches the number of (type,state_keys) types we were searching
+ # for, then we must have found them all, so no need to go walk
+ # further down the tree... UNLESS our types filter contained
+ # wildcards (i.e. Nones) in which case we have to do an exhaustive
+ # search
+ if (
+ types is not None and
+ not wildcard_types and
+ len(results[group]) == len(types)
+ ):
break
next_group = self._simple_select_one_onecol_txn(
|