diff --git a/changelog.d/6126.feature b/changelog.d/6126.feature
new file mode 100644
index 0000000000..1207ba6206
--- /dev/null
+++ b/changelog.d/6126.feature
@@ -0,0 +1 @@
+Group events into larger federation transactions at times of high traffic.
diff --git a/changelog.d/6418.bugfix b/changelog.d/6418.bugfix
new file mode 100644
index 0000000000..a1f488d3a2
--- /dev/null
+++ b/changelog.d/6418.bugfix
@@ -0,0 +1 @@
+Fix phone home stats reporting.
diff --git a/changelog.d/6866.feature b/changelog.d/6866.feature
new file mode 100644
index 0000000000..256feab6ff
--- /dev/null
+++ b/changelog.d/6866.feature
@@ -0,0 +1 @@
+Add ability to run some group APIs on workers.
diff --git a/changelog.d/6873.feature b/changelog.d/6873.feature
new file mode 100644
index 0000000000..bbedf8f7f0
--- /dev/null
+++ b/changelog.d/6873.feature
@@ -0,0 +1 @@
+Add ability to route federation user device queries to workers.
diff --git a/docs/workers.md b/docs/workers.md
index 09a9d8a7b8..6f7ec58780 100644
--- a/docs/workers.md
+++ b/docs/workers.md
@@ -176,9 +176,15 @@ endpoints matching the following regular expressions:
^/_matrix/federation/v1/query_auth/
^/_matrix/federation/v1/event_auth/
^/_matrix/federation/v1/exchange_third_party_invite/
+ ^/_matrix/federation/v1/user/devices/
^/_matrix/federation/v1/send/
+ ^/_matrix/federation/v1/get_groups_publicised$
^/_matrix/key/v2/query
+Additionally, the following REST endpoints can be handled for GET requests:
+
+ ^/_matrix/federation/v1/groups/
+
The above endpoints should all be routed to the federation_reader worker by the
reverse-proxy configuration.
@@ -254,10 +260,13 @@ following regular expressions:
^/_matrix/client/(api/v1|r0|unstable)/keys/changes$
^/_matrix/client/versions$
^/_matrix/client/(api/v1|r0|unstable)/voip/turnServer$
+ ^/_matrix/client/(api/v1|r0|unstable)/joined_groups$
+ ^/_matrix/client/(api/v1|r0|unstable)/get_groups_publicised$
Additionally, the following REST endpoints can be handled for GET requests:
^/_matrix/client/(api/v1|r0|unstable)/pushrules/.*$
+ ^/_matrix/client/(api/v1|r0|unstable)/groups/.*$
Additionally, the following REST endpoints can be handled, but all requests must
be routed to the same instance:
diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py
index ca96da6a4a..7fa91a3b11 100644
--- a/synapse/app/client_reader.py
+++ b/synapse/app/client_reader.py
@@ -57,6 +57,7 @@ from synapse.rest.client.v1.room import (
RoomStateRestServlet,
)
from synapse.rest.client.v1.voip import VoipRestServlet
+from synapse.rest.client.v2_alpha import groups
from synapse.rest.client.v2_alpha.account import ThreepidRestServlet
from synapse.rest.client.v2_alpha.keys import KeyChangesServlet, KeyQueryServlet
from synapse.rest.client.v2_alpha.register import RegisterRestServlet
@@ -124,6 +125,8 @@ class ClientReaderServer(HomeServer):
PushRuleRestServlet(self).register(resource)
VersionsRestServlet(self).register(resource)
+ groups.register_servlets(self, resource)
+
resources.update({"/_matrix/client": resource})
root_resource = create_resource_tree(resources, NoResource())
diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py
index 1f1cea1416..d055d11b23 100644
--- a/synapse/app/federation_reader.py
+++ b/synapse/app/federation_reader.py
@@ -33,8 +33,10 @@ from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
from synapse.replication.slave.storage._base import BaseSlavedStore
from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
+from synapse.replication.slave.storage.devices import SlavedDeviceStore
from synapse.replication.slave.storage.directory import DirectoryStore
from synapse.replication.slave.storage.events import SlavedEventStore
+from synapse.replication.slave.storage.groups import SlavedGroupServerStore
from synapse.replication.slave.storage.keys import SlavedKeyStore
from synapse.replication.slave.storage.profile import SlavedProfileStore
from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
@@ -66,6 +68,8 @@ class FederationReaderSlavedStore(
SlavedEventStore,
SlavedKeyStore,
SlavedRegistrationStore,
+ SlavedGroupServerStore,
+ SlavedDeviceStore,
RoomStore,
DirectoryStore,
SlavedTransactionStore,
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index a4c97ed458..b3e4db507e 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -82,6 +82,8 @@ class FederationServer(FederationBase):
self.handler = hs.get_handlers().federation_handler
self.state = hs.get_state_handler()
+ self.device_handler = hs.get_device_handler()
+
self._server_linearizer = Linearizer("fed_server")
self._transaction_linearizer = Linearizer("fed_txn_handler")
@@ -528,8 +530,9 @@ class FederationServer(FederationBase):
def on_query_client_keys(self, origin, content):
return self.on_query_request("client_keys", content)
- def on_query_user_devices(self, origin, user_id):
- return self.on_query_request("user_devices", user_id)
+ async def on_query_user_devices(self, origin: str, user_id: str):
+ keys = await self.device_handler.on_federation_query_user_devices(user_id)
+ return 200, keys
@trace
async def on_claim_client_keys(self, origin, content):
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 36c83c3027..edf5d265e6 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -153,9 +153,24 @@ class FederationSender(object):
@defer.inlineCallbacks
def _process_event_queue_loop(self):
+ loop_start_time = self.clock.time_msec()
try:
self._is_processing = True
while True:
+ # if we've been going around this loop for a long time without
+ # catching up, deprioritise transaction transmission. This should mean
+ # that events get batched into fewer transactions, which is more
+ # efficient, and hence give us a chance to catch up
+ if (
+ self.clock.time_msec() - loop_start_time > 60 * 1000
+ and not self._transaction_manager.deprioritise_transmission
+ ):
+ logger.warning(
+ "Event queue is getting behind: deprioritising transaction "
+ "transmission"
+ )
+ self._transaction_manager.deprioritise_transmission = True
+
last_token = yield self.store.get_federation_out_pos("events")
next_token, events = yield self.store.get_all_new_events_stream(
last_token, self._last_poked_id, limit=100
@@ -253,6 +268,9 @@ class FederationSender(object):
finally:
self._is_processing = False
+ if self._transaction_manager.deprioritise_transmission:
+ logger.info("Event queue caught up: re-prioritising transmission")
+ self._transaction_manager.deprioritise_transmission = False
def _send_pdu(self, pdu, destinations):
# We loop through all destinations to see whether we already have
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index 5012aaea35..74f81d8260 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -15,6 +15,7 @@
# limitations under the License.
import datetime
import logging
+import random
from prometheus_client import Counter
@@ -37,6 +38,8 @@ from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
# This is defined in the Matrix spec and enforced by the receiver.
MAX_EDUS_PER_TRANSACTION = 100
+DEPRIORITISE_SLEEP_TIME = 10
+
logger = logging.getLogger(__name__)
@@ -190,6 +193,18 @@ class PerDestinationQueue(object):
pending_pdus = []
while True:
+ if self._transaction_manager.deprioritise_transmission:
+ # if the event-processing loop has got behind, sleep to give it
+ # a chance to catch up. Add some randomness so that the transmitters
+ # don't all wake up in sync.
+ sleeptime = random.uniform(
+ DEPRIORITISE_SLEEP_TIME, DEPRIORITISE_SLEEP_TIME * 2
+ )
+ logger.info(
+ "TX [%s]: sleeping for %f seconds", self._destination, sleeptime
+ )
+ yield self._clock.sleep(sleeptime)
+
# We have to keep 2 free slots for presence and rr_edus
limit = MAX_EDUS_PER_TRANSACTION - 2
diff --git a/synapse/federation/sender/transaction_manager.py b/synapse/federation/sender/transaction_manager.py
index 5fed626d5b..ca558fa242 100644
--- a/synapse/federation/sender/transaction_manager.py
+++ b/synapse/federation/sender/transaction_manager.py
@@ -49,6 +49,10 @@ class TransactionManager(object):
# HACK to get unique tx id
self._next_txn_id = int(self.clock.time_msec())
+ # the federation sender sometimes sets this to delay transaction transmission,
+ # if the sender gets behind.
+ self.deprioritise_transmission = False
+
@measure_func("_send_new_transaction")
@defer.inlineCallbacks
def send_new_transaction(self, destination, pending_pdus, pending_edus):
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index dc563538de..9c6f22760d 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -152,7 +152,7 @@ class TransportLayerClient(object):
# generated by the json_data_callback.
json_data = transaction.get_dict()
- path = _create_v1_path("/send/%s", transaction.transaction_id)
+ path = _create_v1_path("/send/%s/", transaction.transaction_id)
response = yield self.client.put_json(
transaction.destination,
@@ -161,7 +161,7 @@ class TransportLayerClient(object):
json_data_callback=json_data_callback,
long_retries=True,
backoff_on_404=True, # If we get a 404 the other side has gone
- try_trailing_slash_on_400=True,
+ # try_trailing_slash_on_400=True,
)
return response
diff --git a/synapse/groups/groups_server.py b/synapse/groups/groups_server.py
index 0ec9be3cb5..c106abae21 100644
--- a/synapse/groups/groups_server.py
+++ b/synapse/groups/groups_server.py
@@ -36,7 +36,7 @@ logger = logging.getLogger(__name__)
# TODO: Flairs
-class GroupsServerHandler(object):
+class GroupsServerWorkerHandler(object):
def __init__(self, hs):
self.hs = hs
self.store = hs.get_datastore()
@@ -51,9 +51,6 @@ class GroupsServerHandler(object):
self.transport_client = hs.get_federation_transport_client()
self.profile_handler = hs.get_profile_handler()
- # Ensure attestations get renewed
- hs.get_groups_attestation_renewer()
-
@defer.inlineCallbacks
def check_group_is_ours(
self, group_id, requester_user_id, and_exists=False, and_is_admin=None
@@ -168,6 +165,197 @@ class GroupsServerHandler(object):
}
@defer.inlineCallbacks
+ def get_group_categories(self, group_id, requester_user_id):
+ """Get all categories in a group (as seen by user)
+ """
+ yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True)
+
+ categories = yield self.store.get_group_categories(group_id=group_id)
+ return {"categories": categories}
+
+ @defer.inlineCallbacks
+ def get_group_category(self, group_id, requester_user_id, category_id):
+ """Get a specific category in a group (as seen by user)
+ """
+ yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True)
+
+ res = yield self.store.get_group_category(
+ group_id=group_id, category_id=category_id
+ )
+
+ logger.info("group %s", res)
+
+ return res
+
+ @defer.inlineCallbacks
+ def get_group_roles(self, group_id, requester_user_id):
+ """Get all roles in a group (as seen by user)
+ """
+ yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True)
+
+ roles = yield self.store.get_group_roles(group_id=group_id)
+ return {"roles": roles}
+
+ @defer.inlineCallbacks
+ def get_group_role(self, group_id, requester_user_id, role_id):
+ """Get a specific role in a group (as seen by user)
+ """
+ yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True)
+
+ res = yield self.store.get_group_role(group_id=group_id, role_id=role_id)
+ return res
+
+ @defer.inlineCallbacks
+ def get_group_profile(self, group_id, requester_user_id):
+ """Get the group profile as seen by requester_user_id
+ """
+
+ yield self.check_group_is_ours(group_id, requester_user_id)
+
+ group = yield self.store.get_group(group_id)
+
+ if group:
+ cols = [
+ "name",
+ "short_description",
+ "long_description",
+ "avatar_url",
+ "is_public",
+ ]
+ group_description = {key: group[key] for key in cols}
+ group_description["is_openly_joinable"] = group["join_policy"] == "open"
+
+ return group_description
+ else:
+ raise SynapseError(404, "Unknown group")
+
+ @defer.inlineCallbacks
+ def get_users_in_group(self, group_id, requester_user_id):
+ """Get the users in group as seen by requester_user_id.
+
+ The ordering is arbitrary at the moment
+ """
+
+ yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True)
+
+ is_user_in_group = yield self.store.is_user_in_group(
+ requester_user_id, group_id
+ )
+
+ user_results = yield self.store.get_users_in_group(
+ group_id, include_private=is_user_in_group
+ )
+
+ chunk = []
+ for user_result in user_results:
+ g_user_id = user_result["user_id"]
+ is_public = user_result["is_public"]
+ is_privileged = user_result["is_admin"]
+
+ entry = {"user_id": g_user_id}
+
+ profile = yield self.profile_handler.get_profile_from_cache(g_user_id)
+ entry.update(profile)
+
+ entry["is_public"] = bool(is_public)
+ entry["is_privileged"] = bool(is_privileged)
+
+ if not self.is_mine_id(g_user_id):
+ attestation = yield self.store.get_remote_attestation(
+ group_id, g_user_id
+ )
+ if not attestation:
+ continue
+
+ entry["attestation"] = attestation
+ else:
+ entry["attestation"] = self.attestations.create_attestation(
+ group_id, g_user_id
+ )
+
+ chunk.append(entry)
+
+ # TODO: If admin add lists of users whose attestations have timed out
+
+ return {"chunk": chunk, "total_user_count_estimate": len(user_results)}
+
+ @defer.inlineCallbacks
+ def get_invited_users_in_group(self, group_id, requester_user_id):
+ """Get the users that have been invited to a group as seen by requester_user_id.
+
+ The ordering is arbitrary at the moment
+ """
+
+ yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True)
+
+ is_user_in_group = yield self.store.is_user_in_group(
+ requester_user_id, group_id
+ )
+
+ if not is_user_in_group:
+ raise SynapseError(403, "User not in group")
+
+ invited_users = yield self.store.get_invited_users_in_group(group_id)
+
+ user_profiles = []
+
+ for user_id in invited_users:
+ user_profile = {"user_id": user_id}
+ try:
+ profile = yield self.profile_handler.get_profile_from_cache(user_id)
+ user_profile.update(profile)
+ except Exception as e:
+ logger.warning("Error getting profile for %s: %s", user_id, e)
+ user_profiles.append(user_profile)
+
+ return {"chunk": user_profiles, "total_user_count_estimate": len(invited_users)}
+
+ @defer.inlineCallbacks
+ def get_rooms_in_group(self, group_id, requester_user_id):
+ """Get the rooms in group as seen by requester_user_id
+
+ This returns rooms in order of decreasing number of joined users
+ """
+
+ yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True)
+
+ is_user_in_group = yield self.store.is_user_in_group(
+ requester_user_id, group_id
+ )
+
+ room_results = yield self.store.get_rooms_in_group(
+ group_id, include_private=is_user_in_group
+ )
+
+ chunk = []
+ for room_result in room_results:
+ room_id = room_result["room_id"]
+
+ joined_users = yield self.store.get_users_in_room(room_id)
+ entry = yield self.room_list_handler.generate_room_entry(
+ room_id, len(joined_users), with_alias=False, allow_private=True
+ )
+
+ if not entry:
+ continue
+
+ entry["is_public"] = bool(room_result["is_public"])
+
+ chunk.append(entry)
+
+ chunk.sort(key=lambda e: -e["num_joined_members"])
+
+ return {"chunk": chunk, "total_room_count_estimate": len(room_results)}
+
+
+class GroupsServerHandler(GroupsServerWorkerHandler):
+ def __init__(self, hs):
+ super(GroupsServerHandler, self).__init__(hs)
+
+ # Ensure attestations get renewed
+ hs.get_groups_attestation_renewer()
+
+ @defer.inlineCallbacks
def update_group_summary_room(
self, group_id, requester_user_id, room_id, category_id, content
):
@@ -230,27 +418,6 @@ class GroupsServerHandler(object):
return {}
@defer.inlineCallbacks
- def get_group_categories(self, group_id, requester_user_id):
- """Get all categories in a group (as seen by user)
- """
- yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True)
-
- categories = yield self.store.get_group_categories(group_id=group_id)
- return {"categories": categories}
-
- @defer.inlineCallbacks
- def get_group_category(self, group_id, requester_user_id, category_id):
- """Get a specific category in a group (as seen by user)
- """
- yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True)
-
- res = yield self.store.get_group_category(
- group_id=group_id, category_id=category_id
- )
-
- return res
-
- @defer.inlineCallbacks
def update_group_category(self, group_id, requester_user_id, category_id, content):
"""Add/Update a group category
"""
@@ -285,24 +452,6 @@ class GroupsServerHandler(object):
return {}
@defer.inlineCallbacks
- def get_group_roles(self, group_id, requester_user_id):
- """Get all roles in a group (as seen by user)
- """
- yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True)
-
- roles = yield self.store.get_group_roles(group_id=group_id)
- return {"roles": roles}
-
- @defer.inlineCallbacks
- def get_group_role(self, group_id, requester_user_id, role_id):
- """Get a specific role in a group (as seen by user)
- """
- yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True)
-
- res = yield self.store.get_group_role(group_id=group_id, role_id=role_id)
- return res
-
- @defer.inlineCallbacks
def update_group_role(self, group_id, requester_user_id, role_id, content):
"""Add/update a role in a group
"""
@@ -371,30 +520,6 @@ class GroupsServerHandler(object):
return {}
@defer.inlineCallbacks
- def get_group_profile(self, group_id, requester_user_id):
- """Get the group profile as seen by requester_user_id
- """
-
- yield self.check_group_is_ours(group_id, requester_user_id)
-
- group = yield self.store.get_group(group_id)
-
- if group:
- cols = [
- "name",
- "short_description",
- "long_description",
- "avatar_url",
- "is_public",
- ]
- group_description = {key: group[key] for key in cols}
- group_description["is_openly_joinable"] = group["join_policy"] == "open"
-
- return group_description
- else:
- raise SynapseError(404, "Unknown group")
-
- @defer.inlineCallbacks
def update_group_profile(self, group_id, requester_user_id, content):
"""Update the group profile
"""
@@ -413,124 +538,6 @@ class GroupsServerHandler(object):
yield self.store.update_group_profile(group_id, profile)
@defer.inlineCallbacks
- def get_users_in_group(self, group_id, requester_user_id):
- """Get the users in group as seen by requester_user_id.
-
- The ordering is arbitrary at the moment
- """
-
- yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True)
-
- is_user_in_group = yield self.store.is_user_in_group(
- requester_user_id, group_id
- )
-
- user_results = yield self.store.get_users_in_group(
- group_id, include_private=is_user_in_group
- )
-
- chunk = []
- for user_result in user_results:
- g_user_id = user_result["user_id"]
- is_public = user_result["is_public"]
- is_privileged = user_result["is_admin"]
-
- entry = {"user_id": g_user_id}
-
- profile = yield self.profile_handler.get_profile_from_cache(g_user_id)
- entry.update(profile)
-
- entry["is_public"] = bool(is_public)
- entry["is_privileged"] = bool(is_privileged)
-
- if not self.is_mine_id(g_user_id):
- attestation = yield self.store.get_remote_attestation(
- group_id, g_user_id
- )
- if not attestation:
- continue
-
- entry["attestation"] = attestation
- else:
- entry["attestation"] = self.attestations.create_attestation(
- group_id, g_user_id
- )
-
- chunk.append(entry)
-
- # TODO: If admin add lists of users whose attestations have timed out
-
- return {"chunk": chunk, "total_user_count_estimate": len(user_results)}
-
- @defer.inlineCallbacks
- def get_invited_users_in_group(self, group_id, requester_user_id):
- """Get the users that have been invited to a group as seen by requester_user_id.
-
- The ordering is arbitrary at the moment
- """
-
- yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True)
-
- is_user_in_group = yield self.store.is_user_in_group(
- requester_user_id, group_id
- )
-
- if not is_user_in_group:
- raise SynapseError(403, "User not in group")
-
- invited_users = yield self.store.get_invited_users_in_group(group_id)
-
- user_profiles = []
-
- for user_id in invited_users:
- user_profile = {"user_id": user_id}
- try:
- profile = yield self.profile_handler.get_profile_from_cache(user_id)
- user_profile.update(profile)
- except Exception as e:
- logger.warning("Error getting profile for %s: %s", user_id, e)
- user_profiles.append(user_profile)
-
- return {"chunk": user_profiles, "total_user_count_estimate": len(invited_users)}
-
- @defer.inlineCallbacks
- def get_rooms_in_group(self, group_id, requester_user_id):
- """Get the rooms in group as seen by requester_user_id
-
- This returns rooms in order of decreasing number of joined users
- """
-
- yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True)
-
- is_user_in_group = yield self.store.is_user_in_group(
- requester_user_id, group_id
- )
-
- room_results = yield self.store.get_rooms_in_group(
- group_id, include_private=is_user_in_group
- )
-
- chunk = []
- for room_result in room_results:
- room_id = room_result["room_id"]
-
- joined_users = yield self.store.get_users_in_room(room_id)
- entry = yield self.room_list_handler.generate_room_entry(
- room_id, len(joined_users), with_alias=False, allow_private=True
- )
-
- if not entry:
- continue
-
- entry["is_public"] = bool(room_result["is_public"])
-
- chunk.append(entry)
-
- chunk.sort(key=lambda e: -e["num_joined_members"])
-
- return {"chunk": chunk, "total_room_count_estimate": len(room_results)}
-
- @defer.inlineCallbacks
def add_room_to_group(self, group_id, requester_user_id, room_id, content):
"""Add room to group
"""
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index a9bd431486..6d8e48ed39 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -225,6 +225,22 @@ class DeviceWorkerHandler(BaseHandler):
return result
+ @defer.inlineCallbacks
+ def on_federation_query_user_devices(self, user_id):
+ stream_id, devices = yield self.store.get_devices_with_keys_by_user(user_id)
+ master_key = yield self.store.get_e2e_cross_signing_key(user_id, "master")
+ self_signing_key = yield self.store.get_e2e_cross_signing_key(
+ user_id, "self_signing"
+ )
+
+ return {
+ "user_id": user_id,
+ "stream_id": stream_id,
+ "devices": devices,
+ "master_key": master_key,
+ "self_signing_key": self_signing_key,
+ }
+
class DeviceHandler(DeviceWorkerHandler):
def __init__(self, hs):
@@ -239,9 +255,6 @@ class DeviceHandler(DeviceWorkerHandler):
federation_registry.register_edu_handler(
"m.device_list_update", self.device_list_updater.incoming_device_list_update
)
- federation_registry.register_query_handler(
- "user_devices", self.on_federation_query_user_devices
- )
hs.get_distributor().observe("user_left_room", self.user_left_room)
@@ -457,22 +470,6 @@ class DeviceHandler(DeviceWorkerHandler):
self.notifier.on_new_event("device_list_key", position, users=[from_user_id])
@defer.inlineCallbacks
- def on_federation_query_user_devices(self, user_id):
- stream_id, devices = yield self.store.get_devices_with_keys_by_user(user_id)
- master_key = yield self.store.get_e2e_cross_signing_key(user_id, "master")
- self_signing_key = yield self.store.get_e2e_cross_signing_key(
- user_id, "self_signing"
- )
-
- return {
- "user_id": user_id,
- "stream_id": stream_id,
- "devices": devices,
- "master_key": master_key,
- "self_signing_key": self_signing_key,
- }
-
- @defer.inlineCallbacks
def user_left_room(self, user, room_id):
user_id = user.to_string()
room_ids = yield self.store.get_rooms_for_user(user_id)
diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py
index 319565510f..ad22415782 100644
--- a/synapse/handlers/groups_local.py
+++ b/synapse/handlers/groups_local.py
@@ -63,7 +63,7 @@ def _create_rerouter(func_name):
return f
-class GroupsLocalHandler(object):
+class GroupsLocalWorkerHandler(object):
def __init__(self, hs):
self.hs = hs
self.store = hs.get_datastore()
@@ -81,40 +81,17 @@ class GroupsLocalHandler(object):
self.profile_handler = hs.get_profile_handler()
- # Ensure attestations get renewed
- hs.get_groups_attestation_renewer()
-
# The following functions merely route the query to the local groups server
# or federation depending on if the group is local or remote
get_group_profile = _create_rerouter("get_group_profile")
- update_group_profile = _create_rerouter("update_group_profile")
get_rooms_in_group = _create_rerouter("get_rooms_in_group")
-
get_invited_users_in_group = _create_rerouter("get_invited_users_in_group")
-
- add_room_to_group = _create_rerouter("add_room_to_group")
- update_room_in_group = _create_rerouter("update_room_in_group")
- remove_room_from_group = _create_rerouter("remove_room_from_group")
-
- update_group_summary_room = _create_rerouter("update_group_summary_room")
- delete_group_summary_room = _create_rerouter("delete_group_summary_room")
-
- update_group_category = _create_rerouter("update_group_category")
- delete_group_category = _create_rerouter("delete_group_category")
get_group_category = _create_rerouter("get_group_category")
get_group_categories = _create_rerouter("get_group_categories")
-
- update_group_summary_user = _create_rerouter("update_group_summary_user")
- delete_group_summary_user = _create_rerouter("delete_group_summary_user")
-
- update_group_role = _create_rerouter("update_group_role")
- delete_group_role = _create_rerouter("delete_group_role")
get_group_role = _create_rerouter("get_group_role")
get_group_roles = _create_rerouter("get_group_roles")
- set_group_join_policy = _create_rerouter("set_group_join_policy")
-
@defer.inlineCallbacks
def get_group_summary(self, group_id, requester_user_id):
"""Get the group summary for a group.
@@ -170,6 +147,144 @@ class GroupsLocalHandler(object):
return res
@defer.inlineCallbacks
+ def get_users_in_group(self, group_id, requester_user_id):
+ """Get users in a group
+ """
+ if self.is_mine_id(group_id):
+ res = yield self.groups_server_handler.get_users_in_group(
+ group_id, requester_user_id
+ )
+ return res
+
+ group_server_name = get_domain_from_id(group_id)
+
+ try:
+ res = yield self.transport_client.get_users_in_group(
+ get_domain_from_id(group_id), group_id, requester_user_id
+ )
+ except HttpResponseException as e:
+ raise e.to_synapse_error()
+ except RequestSendFailed:
+ raise SynapseError(502, "Failed to contact group server")
+
+ chunk = res["chunk"]
+ valid_entries = []
+ for entry in chunk:
+ g_user_id = entry["user_id"]
+ attestation = entry.pop("attestation", {})
+ try:
+ if get_domain_from_id(g_user_id) != group_server_name:
+ yield self.attestations.verify_attestation(
+ attestation,
+ group_id=group_id,
+ user_id=g_user_id,
+ server_name=get_domain_from_id(g_user_id),
+ )
+ valid_entries.append(entry)
+ except Exception as e:
+ logger.info("Failed to verify user is in group: %s", e)
+
+ res["chunk"] = valid_entries
+
+ return res
+
+ @defer.inlineCallbacks
+ def get_joined_groups(self, user_id):
+ group_ids = yield self.store.get_joined_groups(user_id)
+ return {"groups": group_ids}
+
+ @defer.inlineCallbacks
+ def get_publicised_groups_for_user(self, user_id):
+ if self.hs.is_mine_id(user_id):
+ result = yield self.store.get_publicised_groups_for_user(user_id)
+
+ # Check AS associated groups for this user - this depends on the
+ # RegExps in the AS registration file (under `users`)
+ for app_service in self.store.get_app_services():
+ result.extend(app_service.get_groups_for_user(user_id))
+
+ return {"groups": result}
+ else:
+ try:
+ bulk_result = yield self.transport_client.bulk_get_publicised_groups(
+ get_domain_from_id(user_id), [user_id]
+ )
+ except HttpResponseException as e:
+ raise e.to_synapse_error()
+ except RequestSendFailed:
+ raise SynapseError(502, "Failed to contact group server")
+
+ result = bulk_result.get("users", {}).get(user_id)
+ # TODO: Verify attestations
+ return {"groups": result}
+
+ @defer.inlineCallbacks
+ def bulk_get_publicised_groups(self, user_ids, proxy=True):
+ destinations = {}
+ local_users = set()
+
+ for user_id in user_ids:
+ if self.hs.is_mine_id(user_id):
+ local_users.add(user_id)
+ else:
+ destinations.setdefault(get_domain_from_id(user_id), set()).add(user_id)
+
+ if not proxy and destinations:
+ raise SynapseError(400, "Some user_ids are not local")
+
+ results = {}
+ failed_results = []
+ for destination, dest_user_ids in iteritems(destinations):
+ try:
+ r = yield self.transport_client.bulk_get_publicised_groups(
+ destination, list(dest_user_ids)
+ )
+ results.update(r["users"])
+ except Exception:
+ failed_results.extend(dest_user_ids)
+
+ for uid in local_users:
+ results[uid] = yield self.store.get_publicised_groups_for_user(uid)
+
+ # Check AS associated groups for this user - this depends on the
+ # RegExps in the AS registration file (under `users`)
+ for app_service in self.store.get_app_services():
+ results[uid].extend(app_service.get_groups_for_user(uid))
+
+ return {"users": results}
+
+
+class GroupsLocalHandler(GroupsLocalWorkerHandler):
+ def __init__(self, hs):
+ super(GroupsLocalHandler, self).__init__(hs)
+
+ # Ensure attestations get renewed
+ hs.get_groups_attestation_renewer()
+
+ # The following functions merely route the query to the local groups server
+ # or federation depending on if the group is local or remote
+
+ update_group_profile = _create_rerouter("update_group_profile")
+
+ add_room_to_group = _create_rerouter("add_room_to_group")
+ update_room_in_group = _create_rerouter("update_room_in_group")
+ remove_room_from_group = _create_rerouter("remove_room_from_group")
+
+ update_group_summary_room = _create_rerouter("update_group_summary_room")
+ delete_group_summary_room = _create_rerouter("delete_group_summary_room")
+
+ update_group_category = _create_rerouter("update_group_category")
+ delete_group_category = _create_rerouter("delete_group_category")
+
+ update_group_summary_user = _create_rerouter("update_group_summary_user")
+ delete_group_summary_user = _create_rerouter("delete_group_summary_user")
+
+ update_group_role = _create_rerouter("update_group_role")
+ delete_group_role = _create_rerouter("delete_group_role")
+
+ set_group_join_policy = _create_rerouter("set_group_join_policy")
+
+ @defer.inlineCallbacks
def create_group(self, group_id, user_id, content):
"""Create a group
"""
@@ -220,48 +335,6 @@ class GroupsLocalHandler(object):
return res
@defer.inlineCallbacks
- def get_users_in_group(self, group_id, requester_user_id):
- """Get users in a group
- """
- if self.is_mine_id(group_id):
- res = yield self.groups_server_handler.get_users_in_group(
- group_id, requester_user_id
- )
- return res
-
- group_server_name = get_domain_from_id(group_id)
-
- try:
- res = yield self.transport_client.get_users_in_group(
- get_domain_from_id(group_id), group_id, requester_user_id
- )
- except HttpResponseException as e:
- raise e.to_synapse_error()
- except RequestSendFailed:
- raise SynapseError(502, "Failed to contact group server")
-
- chunk = res["chunk"]
- valid_entries = []
- for entry in chunk:
- g_user_id = entry["user_id"]
- attestation = entry.pop("attestation", {})
- try:
- if get_domain_from_id(g_user_id) != group_server_name:
- yield self.attestations.verify_attestation(
- attestation,
- group_id=group_id,
- user_id=g_user_id,
- server_name=get_domain_from_id(g_user_id),
- )
- valid_entries.append(entry)
- except Exception as e:
- logger.info("Failed to verify user is in group: %s", e)
-
- res["chunk"] = valid_entries
-
- return res
-
- @defer.inlineCallbacks
def join_group(self, group_id, user_id, content):
"""Request to join a group
"""
@@ -452,68 +525,3 @@ class GroupsLocalHandler(object):
group_id, user_id, membership="leave"
)
self.notifier.on_new_event("groups_key", token, users=[user_id])
-
- @defer.inlineCallbacks
- def get_joined_groups(self, user_id):
- group_ids = yield self.store.get_joined_groups(user_id)
- return {"groups": group_ids}
-
- @defer.inlineCallbacks
- def get_publicised_groups_for_user(self, user_id):
- if self.hs.is_mine_id(user_id):
- result = yield self.store.get_publicised_groups_for_user(user_id)
-
- # Check AS associated groups for this user - this depends on the
- # RegExps in the AS registration file (under `users`)
- for app_service in self.store.get_app_services():
- result.extend(app_service.get_groups_for_user(user_id))
-
- return {"groups": result}
- else:
- try:
- bulk_result = yield self.transport_client.bulk_get_publicised_groups(
- get_domain_from_id(user_id), [user_id]
- )
- except HttpResponseException as e:
- raise e.to_synapse_error()
- except RequestSendFailed:
- raise SynapseError(502, "Failed to contact group server")
-
- result = bulk_result.get("users", {}).get(user_id)
- # TODO: Verify attestations
- return {"groups": result}
-
- @defer.inlineCallbacks
- def bulk_get_publicised_groups(self, user_ids, proxy=True):
- destinations = {}
- local_users = set()
-
- for user_id in user_ids:
- if self.hs.is_mine_id(user_id):
- local_users.add(user_id)
- else:
- destinations.setdefault(get_domain_from_id(user_id), set()).add(user_id)
-
- if not proxy and destinations:
- raise SynapseError(400, "Some user_ids are not local")
-
- results = {}
- failed_results = []
- for destination, dest_user_ids in iteritems(destinations):
- try:
- r = yield self.transport_client.bulk_get_publicised_groups(
- destination, list(dest_user_ids)
- )
- results.update(r["users"])
- except Exception:
- failed_results.extend(dest_user_ids)
-
- for uid in local_users:
- results[uid] = yield self.store.get_publicised_groups_for_user(uid)
-
- # Check AS associated groups for this user - this depends on the
- # RegExps in the AS registration file (under `users`)
- for app_service in self.store.get_app_services():
- results[uid].extend(app_service.get_groups_for_user(uid))
-
- return {"users": results}
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index bdf16c84d3..9743c0e0d7 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -229,7 +229,7 @@ class MessageHandler(object):
# If this is an AS, double check that they are allowed to see the members.
# This can either be because the AS user is in the room or because there
# is a user in the room that the AS is "interested in"
- if requester.app_service and user_id not in users_with_profile:
+ if False and requester.app_service and user_id not in users_with_profile:
for uid in users_with_profile:
if requester.app_service.is_interested_in_user(uid):
break
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index c615206df1..2252a86f77 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -43,7 +43,8 @@ class RoomListHandler(BaseHandler):
def __init__(self, hs):
super(RoomListHandler, self).__init__(hs)
self.enable_room_list_search = hs.config.enable_room_list_search
- self.response_cache = ResponseCache(hs, "room_list")
+
+ self.response_cache = ResponseCache(hs, "room_list", timeout_ms=10 * 60 * 1000)
self.remote_response_cache = ResponseCache(
hs, "remote_room_list", timeout_ms=30 * 1000
)
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 15e8aa5249..dde448c296 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -62,6 +62,7 @@ class RoomMemberHandler(object):
self.event_creation_handler = hs.get_event_creation_handler()
self.member_linearizer = Linearizer(name="member")
+ self.member_limiter = Linearizer(max_count=10, name="member_as_limiter")
self.clock = hs.get_clock()
self.spam_checker = hs.get_spam_checker()
@@ -269,19 +270,38 @@ class RoomMemberHandler(object):
):
key = (room_id,)
- with (yield self.member_linearizer.queue(key)):
- result = yield self._update_membership(
- requester,
- target,
- room_id,
- action,
- txn_id=txn_id,
- remote_room_hosts=remote_room_hosts,
- third_party_signed=third_party_signed,
- ratelimit=ratelimit,
- content=content,
- require_consent=require_consent,
- )
+ as_id = object()
+ if requester.app_service:
+ as_id = requester.app_service.id
+
+ then = self.clock.time_msec()
+
+ with (yield self.member_limiter.queue(as_id)):
+ diff = self.clock.time_msec() - then
+
+ if diff > 80 * 1000:
+ # haproxy would have timed the request out anyway...
+ raise SynapseError(504, "took to long to process")
+
+ with (yield self.member_linearizer.queue(key)):
+ diff = self.clock.time_msec() - then
+
+ if diff > 80 * 1000:
+ # haproxy would have timed the request out anyway...
+ raise SynapseError(504, "took to long to process")
+
+ result = yield self._update_membership(
+ requester,
+ target,
+ room_id,
+ action,
+ txn_id=txn_id,
+ remote_room_hosts=remote_room_hosts,
+ third_party_signed=third_party_signed,
+ ratelimit=ratelimit,
+ content=content,
+ require_consent=require_consent,
+ )
return result
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index cd95f85e3f..83389d19db 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -40,6 +40,7 @@ logger = logging.getLogger(__name__)
# Debug logger for https://github.com/matrix-org/synapse/issues/4422
issue4422_logger = logging.getLogger("synapse.handler.sync.4422_debug")
+SYNC_RESPONSE_CACHE_MS = 2 * 60 * 1000
# Counts the number of times we returned a non-empty sync. `type` is one of
# "initial_sync", "full_state_sync" or "incremental_sync", `lazy_loaded` is
@@ -225,7 +226,9 @@ class SyncHandler(object):
self.presence_handler = hs.get_presence_handler()
self.event_sources = hs.get_event_sources()
self.clock = hs.get_clock()
- self.response_cache = ResponseCache(hs, "sync")
+ self.response_cache = ResponseCache(
+ hs, "sync", timeout_ms=SYNC_RESPONSE_CACHE_MS
+ )
self.state = hs.get_state_handler()
self.auth = hs.get_auth()
self.storage = hs.get_storage()
diff --git a/synapse/http/federation/well_known_resolver.py b/synapse/http/federation/well_known_resolver.py
index 7ddfad286d..b82c8a84f4 100644
--- a/synapse/http/federation/well_known_resolver.py
+++ b/synapse/http/federation/well_known_resolver.py
@@ -103,6 +103,10 @@ class WellKnownResolver(object):
Returns:
Deferred[WellKnownLookupResult]: The result of the lookup
"""
+
+ if server_name == b"kde.org":
+ return WellKnownLookupResult(delegated_server=b"kde.modular.im:443")
+
try:
prev_result, expiry, ttl = self._well_known_cache.get_with_expiry(
server_name
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index d0879b0490..afa9ef31bf 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -103,6 +103,10 @@ class HttpPusher(object):
if "url" not in self.data:
raise PusherConfigException("'url' required in data for HTTP pusher")
self.url = self.data["url"]
+ self.url = self.url.replace(
+ "https://matrix.org/_matrix/push/v1/notify",
+ "http://10.103.0.7/_matrix/push/v1/notify",
+ )
self.http_client = hs.get_proxied_http_client()
self.data_minus_url = {}
self.data_minus_url.update(self.data)
diff --git a/synapse/replication/slave/storage/groups.py b/synapse/replication/slave/storage/groups.py
index 69a4ae42f9..2d4fd08cf5 100644
--- a/synapse/replication/slave/storage/groups.py
+++ b/synapse/replication/slave/storage/groups.py
@@ -13,15 +13,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from synapse.storage import DataStore
+from synapse.replication.slave.storage._base import BaseSlavedStore
+from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker
+from synapse.storage.data_stores.main.group_server import GroupServerWorkerStore
from synapse.storage.database import Database
from synapse.util.caches.stream_change_cache import StreamChangeCache
-from ._base import BaseSlavedStore, __func__
-from ._slaved_id_tracker import SlavedIdTracker
-
-class SlavedGroupServerStore(BaseSlavedStore):
+class SlavedGroupServerStore(GroupServerWorkerStore, BaseSlavedStore):
def __init__(self, database: Database, db_conn, hs):
super(SlavedGroupServerStore, self).__init__(database, db_conn, hs)
@@ -35,9 +34,8 @@ class SlavedGroupServerStore(BaseSlavedStore):
self._group_updates_id_gen.get_current_token(),
)
- get_groups_changes_for_user = __func__(DataStore.get_groups_changes_for_user)
- get_group_stream_token = __func__(DataStore.get_group_stream_token)
- get_all_groups_for_user = __func__(DataStore.get_all_groups_for_user)
+ def get_group_stream_token(self):
+ return self._group_updates_id_gen.get_current_token()
def stream_positions(self):
result = super(SlavedGroupServerStore, self).stream_positions()
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index a8d568b14a..208e8a667b 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -24,7 +24,7 @@ import attr
logger = logging.getLogger(__name__)
-MAX_EVENTS_BEHIND = 10000
+MAX_EVENTS_BEHIND = 500000
BackfillStreamRow = namedtuple(
"BackfillStreamRow",
diff --git a/synapse/server.py b/synapse/server.py
index 7926867b77..fd2f69e928 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -50,7 +50,7 @@ from synapse.federation.send_queue import FederationRemoteSendQueue
from synapse.federation.sender import FederationSender
from synapse.federation.transport.client import TransportLayerClient
from synapse.groups.attestations import GroupAttestationSigning, GroupAttestionRenewer
-from synapse.groups.groups_server import GroupsServerHandler
+from synapse.groups.groups_server import GroupsServerHandler, GroupsServerWorkerHandler
from synapse.handlers import Handlers
from synapse.handlers.account_validity import AccountValidityHandler
from synapse.handlers.acme import AcmeHandler
@@ -62,7 +62,7 @@ from synapse.handlers.devicemessage import DeviceMessageHandler
from synapse.handlers.e2e_keys import E2eKeysHandler
from synapse.handlers.e2e_room_keys import E2eRoomKeysHandler
from synapse.handlers.events import EventHandler, EventStreamHandler
-from synapse.handlers.groups_local import GroupsLocalHandler
+from synapse.handlers.groups_local import GroupsLocalHandler, GroupsLocalWorkerHandler
from synapse.handlers.initial_sync import InitialSyncHandler
from synapse.handlers.message import EventCreationHandler, MessageHandler
from synapse.handlers.pagination import PaginationHandler
@@ -460,10 +460,16 @@ class HomeServer(object):
return UserDirectoryHandler(self)
def build_groups_local_handler(self):
- return GroupsLocalHandler(self)
+ if self.config.worker_app:
+ return GroupsLocalWorkerHandler(self)
+ else:
+ return GroupsLocalHandler(self)
def build_groups_server_handler(self):
- return GroupsServerHandler(self)
+ if self.config.worker_app:
+ return GroupsServerWorkerHandler(self)
+ else:
+ return GroupsServerHandler(self)
def build_groups_attestation_signing(self):
return GroupAttestationSigning(self)
diff --git a/synapse/storage/data_stores/main/client_ips.py b/synapse/storage/data_stores/main/client_ips.py
index 13f4c9c72e..68f6a85a83 100644
--- a/synapse/storage/data_stores/main/client_ips.py
+++ b/synapse/storage/data_stores/main/client_ips.py
@@ -30,7 +30,7 @@ logger = logging.getLogger(__name__)
# Number of msec of granularity to store the user IP 'last seen' time. Smaller
# times give more inserts into the database even for readonly API hits
# 120 seconds == 2 minutes
-LAST_SEEN_GRANULARITY = 120 * 1000
+LAST_SEEN_GRANULARITY = 10 * 60 * 1000
class ClientIpBackgroundUpdateStore(SQLBaseStore):
diff --git a/synapse/storage/data_stores/main/group_server.py b/synapse/storage/data_stores/main/group_server.py
index 6acd45e9f3..0963e6c250 100644
--- a/synapse/storage/data_stores/main/group_server.py
+++ b/synapse/storage/data_stores/main/group_server.py
@@ -27,21 +27,7 @@ _DEFAULT_CATEGORY_ID = ""
_DEFAULT_ROLE_ID = ""
-class GroupServerStore(SQLBaseStore):
- def set_group_join_policy(self, group_id, join_policy):
- """Set the join policy of a group.
-
- join_policy can be one of:
- * "invite"
- * "open"
- """
- return self.db.simple_update_one(
- table="groups",
- keyvalues={"group_id": group_id},
- updatevalues={"join_policy": join_policy},
- desc="set_group_join_policy",
- )
-
+class GroupServerWorkerStore(SQLBaseStore):
def get_group(self, group_id):
return self.db.simple_select_one(
table="groups",
@@ -157,6 +143,366 @@ class GroupServerStore(SQLBaseStore):
"get_rooms_for_summary", _get_rooms_for_summary_txn
)
+ @defer.inlineCallbacks
+ def get_group_categories(self, group_id):
+ rows = yield self.db.simple_select_list(
+ table="group_room_categories",
+ keyvalues={"group_id": group_id},
+ retcols=("category_id", "is_public", "profile"),
+ desc="get_group_categories",
+ )
+
+ return {
+ row["category_id"]: {
+ "is_public": row["is_public"],
+ "profile": json.loads(row["profile"]),
+ }
+ for row in rows
+ }
+
+ @defer.inlineCallbacks
+ def get_group_category(self, group_id, category_id):
+ category = yield self.db.simple_select_one(
+ table="group_room_categories",
+ keyvalues={"group_id": group_id, "category_id": category_id},
+ retcols=("is_public", "profile"),
+ desc="get_group_category",
+ )
+
+ category["profile"] = json.loads(category["profile"])
+
+ return category
+
+ @defer.inlineCallbacks
+ def get_group_roles(self, group_id):
+ rows = yield self.db.simple_select_list(
+ table="group_roles",
+ keyvalues={"group_id": group_id},
+ retcols=("role_id", "is_public", "profile"),
+ desc="get_group_roles",
+ )
+
+ return {
+ row["role_id"]: {
+ "is_public": row["is_public"],
+ "profile": json.loads(row["profile"]),
+ }
+ for row in rows
+ }
+
+ @defer.inlineCallbacks
+ def get_group_role(self, group_id, role_id):
+ role = yield self.db.simple_select_one(
+ table="group_roles",
+ keyvalues={"group_id": group_id, "role_id": role_id},
+ retcols=("is_public", "profile"),
+ desc="get_group_role",
+ )
+
+ role["profile"] = json.loads(role["profile"])
+
+ return role
+
+ def get_local_groups_for_room(self, room_id):
+ """Get all of the local group that contain a given room
+ Args:
+ room_id (str): The ID of a room
+ Returns:
+ Deferred[list[str]]: A twisted.Deferred containing a list of group ids
+ containing this room
+ """
+ return self.db.simple_select_onecol(
+ table="group_rooms",
+ keyvalues={"room_id": room_id},
+ retcol="group_id",
+ desc="get_local_groups_for_room",
+ )
+
+ def get_users_for_summary_by_role(self, group_id, include_private=False):
+ """Get the users and roles that should be included in a summary request
+
+ Returns ([users], [roles])
+ """
+
+ def _get_users_for_summary_txn(txn):
+ keyvalues = {"group_id": group_id}
+ if not include_private:
+ keyvalues["is_public"] = True
+
+ sql = """
+ SELECT user_id, is_public, role_id, user_order
+ FROM group_summary_users
+ WHERE group_id = ?
+ """
+
+ if not include_private:
+ sql += " AND is_public = ?"
+ txn.execute(sql, (group_id, True))
+ else:
+ txn.execute(sql, (group_id,))
+
+ users = [
+ {
+ "user_id": row[0],
+ "is_public": row[1],
+ "role_id": row[2] if row[2] != _DEFAULT_ROLE_ID else None,
+ "order": row[3],
+ }
+ for row in txn
+ ]
+
+ sql = """
+ SELECT role_id, is_public, profile, role_order
+ FROM group_summary_roles
+ INNER JOIN group_roles USING (group_id, role_id)
+ WHERE group_id = ?
+ """
+
+ if not include_private:
+ sql += " AND is_public = ?"
+ txn.execute(sql, (group_id, True))
+ else:
+ txn.execute(sql, (group_id,))
+
+ roles = {
+ row[0]: {
+ "is_public": row[1],
+ "profile": json.loads(row[2]),
+ "order": row[3],
+ }
+ for row in txn
+ }
+
+ return users, roles
+
+ return self.db.runInteraction(
+ "get_users_for_summary_by_role", _get_users_for_summary_txn
+ )
+
+ def is_user_in_group(self, user_id, group_id):
+ return self.db.simple_select_one_onecol(
+ table="group_users",
+ keyvalues={"group_id": group_id, "user_id": user_id},
+ retcol="user_id",
+ allow_none=True,
+ desc="is_user_in_group",
+ ).addCallback(lambda r: bool(r))
+
+ def is_user_admin_in_group(self, group_id, user_id):
+ return self.db.simple_select_one_onecol(
+ table="group_users",
+ keyvalues={"group_id": group_id, "user_id": user_id},
+ retcol="is_admin",
+ allow_none=True,
+ desc="is_user_admin_in_group",
+ )
+
+ def is_user_invited_to_local_group(self, group_id, user_id):
+ """Has the group server invited a user?
+ """
+ return self.db.simple_select_one_onecol(
+ table="group_invites",
+ keyvalues={"group_id": group_id, "user_id": user_id},
+ retcol="user_id",
+ desc="is_user_invited_to_local_group",
+ allow_none=True,
+ )
+
+ def get_users_membership_info_in_group(self, group_id, user_id):
+ """Get a dict describing the membership of a user in a group.
+
+ Example if joined:
+
+ {
+ "membership": "join",
+ "is_public": True,
+ "is_privileged": False,
+ }
+
+ Returns an empty dict if the user is not join/invite/etc
+ """
+
+ def _get_users_membership_in_group_txn(txn):
+ row = self.db.simple_select_one_txn(
+ txn,
+ table="group_users",
+ keyvalues={"group_id": group_id, "user_id": user_id},
+ retcols=("is_admin", "is_public"),
+ allow_none=True,
+ )
+
+ if row:
+ return {
+ "membership": "join",
+ "is_public": row["is_public"],
+ "is_privileged": row["is_admin"],
+ }
+
+ row = self.db.simple_select_one_onecol_txn(
+ txn,
+ table="group_invites",
+ keyvalues={"group_id": group_id, "user_id": user_id},
+ retcol="user_id",
+ allow_none=True,
+ )
+
+ if row:
+ return {"membership": "invite"}
+
+ return {}
+
+ return self.db.runInteraction(
+ "get_users_membership_info_in_group", _get_users_membership_in_group_txn
+ )
+
+ def get_publicised_groups_for_user(self, user_id):
+ """Get all groups a user is publicising
+ """
+ return self.db.simple_select_onecol(
+ table="local_group_membership",
+ keyvalues={"user_id": user_id, "membership": "join", "is_publicised": True},
+ retcol="group_id",
+ desc="get_publicised_groups_for_user",
+ )
+
+ def get_attestations_need_renewals(self, valid_until_ms):
+ """Get all attestations that need to be renewed until givent time
+ """
+
+ def _get_attestations_need_renewals_txn(txn):
+ sql = """
+ SELECT group_id, user_id FROM group_attestations_renewals
+ WHERE valid_until_ms <= ?
+ """
+ txn.execute(sql, (valid_until_ms,))
+ return self.db.cursor_to_dict(txn)
+
+ return self.db.runInteraction(
+ "get_attestations_need_renewals", _get_attestations_need_renewals_txn
+ )
+
+ @defer.inlineCallbacks
+ def get_remote_attestation(self, group_id, user_id):
+ """Get the attestation that proves the remote agrees that the user is
+ in the group.
+ """
+ row = yield self.db.simple_select_one(
+ table="group_attestations_remote",
+ keyvalues={"group_id": group_id, "user_id": user_id},
+ retcols=("valid_until_ms", "attestation_json"),
+ desc="get_remote_attestation",
+ allow_none=True,
+ )
+
+ now = int(self._clock.time_msec())
+ if row and now < row["valid_until_ms"]:
+ return json.loads(row["attestation_json"])
+
+ return None
+
+ def get_joined_groups(self, user_id):
+ return self.db.simple_select_onecol(
+ table="local_group_membership",
+ keyvalues={"user_id": user_id, "membership": "join"},
+ retcol="group_id",
+ desc="get_joined_groups",
+ )
+
+ def get_all_groups_for_user(self, user_id, now_token):
+ def _get_all_groups_for_user_txn(txn):
+ sql = """
+ SELECT group_id, type, membership, u.content
+ FROM local_group_updates AS u
+ INNER JOIN local_group_membership USING (group_id, user_id)
+ WHERE user_id = ? AND membership != 'leave'
+ AND stream_id <= ?
+ """
+ txn.execute(sql, (user_id, now_token))
+ return [
+ {
+ "group_id": row[0],
+ "type": row[1],
+ "membership": row[2],
+ "content": json.loads(row[3]),
+ }
+ for row in txn
+ ]
+
+ return self.db.runInteraction(
+ "get_all_groups_for_user", _get_all_groups_for_user_txn
+ )
+
+ def get_groups_changes_for_user(self, user_id, from_token, to_token):
+ from_token = int(from_token)
+ has_changed = self._group_updates_stream_cache.has_entity_changed(
+ user_id, from_token
+ )
+ if not has_changed:
+ return defer.succeed([])
+
+ def _get_groups_changes_for_user_txn(txn):
+ sql = """
+ SELECT group_id, membership, type, u.content
+ FROM local_group_updates AS u
+ INNER JOIN local_group_membership USING (group_id, user_id)
+ WHERE user_id = ? AND ? < stream_id AND stream_id <= ?
+ """
+ txn.execute(sql, (user_id, from_token, to_token))
+ return [
+ {
+ "group_id": group_id,
+ "membership": membership,
+ "type": gtype,
+ "content": json.loads(content_json),
+ }
+ for group_id, membership, gtype, content_json in txn
+ ]
+
+ return self.db.runInteraction(
+ "get_groups_changes_for_user", _get_groups_changes_for_user_txn
+ )
+
+ def get_all_groups_changes(self, from_token, to_token, limit):
+ from_token = int(from_token)
+ has_changed = self._group_updates_stream_cache.has_any_entity_changed(
+ from_token
+ )
+ if not has_changed:
+ return defer.succeed([])
+
+ def _get_all_groups_changes_txn(txn):
+ sql = """
+ SELECT stream_id, group_id, user_id, type, content
+ FROM local_group_updates
+ WHERE ? < stream_id AND stream_id <= ?
+ LIMIT ?
+ """
+ txn.execute(sql, (from_token, to_token, limit))
+ return [
+ (stream_id, group_id, user_id, gtype, json.loads(content_json))
+ for stream_id, group_id, user_id, gtype, content_json in txn
+ ]
+
+ return self.db.runInteraction(
+ "get_all_groups_changes", _get_all_groups_changes_txn
+ )
+
+
+class GroupServerStore(GroupServerWorkerStore):
+ def set_group_join_policy(self, group_id, join_policy):
+ """Set the join policy of a group.
+
+ join_policy can be one of:
+ * "invite"
+ * "open"
+ """
+ return self.db.simple_update_one(
+ table="groups",
+ keyvalues={"group_id": group_id},
+ updatevalues={"join_policy": join_policy},
+ desc="set_group_join_policy",
+ )
+
def add_room_to_summary(self, group_id, room_id, category_id, order, is_public):
return self.db.runInteraction(
"add_room_to_summary",
@@ -299,36 +645,6 @@ class GroupServerStore(SQLBaseStore):
desc="remove_room_from_summary",
)
- @defer.inlineCallbacks
- def get_group_categories(self, group_id):
- rows = yield self.db.simple_select_list(
- table="group_room_categories",
- keyvalues={"group_id": group_id},
- retcols=("category_id", "is_public", "profile"),
- desc="get_group_categories",
- )
-
- return {
- row["category_id"]: {
- "is_public": row["is_public"],
- "profile": json.loads(row["profile"]),
- }
- for row in rows
- }
-
- @defer.inlineCallbacks
- def get_group_category(self, group_id, category_id):
- category = yield self.db.simple_select_one(
- table="group_room_categories",
- keyvalues={"group_id": group_id, "category_id": category_id},
- retcols=("is_public", "profile"),
- desc="get_group_category",
- )
-
- category["profile"] = json.loads(category["profile"])
-
- return category
-
def upsert_group_category(self, group_id, category_id, profile, is_public):
"""Add/update room category for group
"""
@@ -360,36 +676,6 @@ class GroupServerStore(SQLBaseStore):
desc="remove_group_category",
)
- @defer.inlineCallbacks
- def get_group_roles(self, group_id):
- rows = yield self.db.simple_select_list(
- table="group_roles",
- keyvalues={"group_id": group_id},
- retcols=("role_id", "is_public", "profile"),
- desc="get_group_roles",
- )
-
- return {
- row["role_id"]: {
- "is_public": row["is_public"],
- "profile": json.loads(row["profile"]),
- }
- for row in rows
- }
-
- @defer.inlineCallbacks
- def get_group_role(self, group_id, role_id):
- role = yield self.db.simple_select_one(
- table="group_roles",
- keyvalues={"group_id": group_id, "role_id": role_id},
- retcols=("is_public", "profile"),
- desc="get_group_role",
- )
-
- role["profile"] = json.loads(role["profile"])
-
- return role
-
def upsert_group_role(self, group_id, role_id, profile, is_public):
"""Add/remove user role
"""
@@ -555,100 +841,6 @@ class GroupServerStore(SQLBaseStore):
desc="remove_user_from_summary",
)
- def get_local_groups_for_room(self, room_id):
- """Get all of the local group that contain a given room
- Args:
- room_id (str): The ID of a room
- Returns:
- Deferred[list[str]]: A twisted.Deferred containing a list of group ids
- containing this room
- """
- return self.db.simple_select_onecol(
- table="group_rooms",
- keyvalues={"room_id": room_id},
- retcol="group_id",
- desc="get_local_groups_for_room",
- )
-
- def get_users_for_summary_by_role(self, group_id, include_private=False):
- """Get the users and roles that should be included in a summary request
-
- Returns ([users], [roles])
- """
-
- def _get_users_for_summary_txn(txn):
- keyvalues = {"group_id": group_id}
- if not include_private:
- keyvalues["is_public"] = True
-
- sql = """
- SELECT user_id, is_public, role_id, user_order
- FROM group_summary_users
- WHERE group_id = ?
- """
-
- if not include_private:
- sql += " AND is_public = ?"
- txn.execute(sql, (group_id, True))
- else:
- txn.execute(sql, (group_id,))
-
- users = [
- {
- "user_id": row[0],
- "is_public": row[1],
- "role_id": row[2] if row[2] != _DEFAULT_ROLE_ID else None,
- "order": row[3],
- }
- for row in txn
- ]
-
- sql = """
- SELECT role_id, is_public, profile, role_order
- FROM group_summary_roles
- INNER JOIN group_roles USING (group_id, role_id)
- WHERE group_id = ?
- """
-
- if not include_private:
- sql += " AND is_public = ?"
- txn.execute(sql, (group_id, True))
- else:
- txn.execute(sql, (group_id,))
-
- roles = {
- row[0]: {
- "is_public": row[1],
- "profile": json.loads(row[2]),
- "order": row[3],
- }
- for row in txn
- }
-
- return users, roles
-
- return self.db.runInteraction(
- "get_users_for_summary_by_role", _get_users_for_summary_txn
- )
-
- def is_user_in_group(self, user_id, group_id):
- return self.db.simple_select_one_onecol(
- table="group_users",
- keyvalues={"group_id": group_id, "user_id": user_id},
- retcol="user_id",
- allow_none=True,
- desc="is_user_in_group",
- ).addCallback(lambda r: bool(r))
-
- def is_user_admin_in_group(self, group_id, user_id):
- return self.db.simple_select_one_onecol(
- table="group_users",
- keyvalues={"group_id": group_id, "user_id": user_id},
- retcol="is_admin",
- allow_none=True,
- desc="is_user_admin_in_group",
- )
-
def add_group_invite(self, group_id, user_id):
"""Record that the group server has invited a user
"""
@@ -658,64 +850,6 @@ class GroupServerStore(SQLBaseStore):
desc="add_group_invite",
)
- def is_user_invited_to_local_group(self, group_id, user_id):
- """Has the group server invited a user?
- """
- return self.db.simple_select_one_onecol(
- table="group_invites",
- keyvalues={"group_id": group_id, "user_id": user_id},
- retcol="user_id",
- desc="is_user_invited_to_local_group",
- allow_none=True,
- )
-
- def get_users_membership_info_in_group(self, group_id, user_id):
- """Get a dict describing the membership of a user in a group.
-
- Example if joined:
-
- {
- "membership": "join",
- "is_public": True,
- "is_privileged": False,
- }
-
- Returns an empty dict if the user is not join/invite/etc
- """
-
- def _get_users_membership_in_group_txn(txn):
- row = self.db.simple_select_one_txn(
- txn,
- table="group_users",
- keyvalues={"group_id": group_id, "user_id": user_id},
- retcols=("is_admin", "is_public"),
- allow_none=True,
- )
-
- if row:
- return {
- "membership": "join",
- "is_public": row["is_public"],
- "is_privileged": row["is_admin"],
- }
-
- row = self.db.simple_select_one_onecol_txn(
- txn,
- table="group_invites",
- keyvalues={"group_id": group_id, "user_id": user_id},
- retcol="user_id",
- allow_none=True,
- )
-
- if row:
- return {"membership": "invite"}
-
- return {}
-
- return self.db.runInteraction(
- "get_users_membership_info_in_group", _get_users_membership_in_group_txn
- )
-
def add_user_to_group(
self,
group_id,
@@ -846,16 +980,6 @@ class GroupServerStore(SQLBaseStore):
"remove_room_from_group", _remove_room_from_group_txn
)
- def get_publicised_groups_for_user(self, user_id):
- """Get all groups a user is publicising
- """
- return self.db.simple_select_onecol(
- table="local_group_membership",
- keyvalues={"user_id": user_id, "membership": "join", "is_publicised": True},
- retcol="group_id",
- desc="get_publicised_groups_for_user",
- )
-
def update_group_publicity(self, group_id, user_id, publicise):
"""Update whether the user is publicising their membership of the group
"""
@@ -1000,22 +1124,6 @@ class GroupServerStore(SQLBaseStore):
desc="update_group_profile",
)
- def get_attestations_need_renewals(self, valid_until_ms):
- """Get all attestations that need to be renewed until givent time
- """
-
- def _get_attestations_need_renewals_txn(txn):
- sql = """
- SELECT group_id, user_id FROM group_attestations_renewals
- WHERE valid_until_ms <= ?
- """
- txn.execute(sql, (valid_until_ms,))
- return self.db.cursor_to_dict(txn)
-
- return self.db.runInteraction(
- "get_attestations_need_renewals", _get_attestations_need_renewals_txn
- )
-
def update_attestation_renewal(self, group_id, user_id, attestation):
"""Update an attestation that we have renewed
"""
@@ -1054,112 +1162,6 @@ class GroupServerStore(SQLBaseStore):
desc="remove_attestation_renewal",
)
- @defer.inlineCallbacks
- def get_remote_attestation(self, group_id, user_id):
- """Get the attestation that proves the remote agrees that the user is
- in the group.
- """
- row = yield self.db.simple_select_one(
- table="group_attestations_remote",
- keyvalues={"group_id": group_id, "user_id": user_id},
- retcols=("valid_until_ms", "attestation_json"),
- desc="get_remote_attestation",
- allow_none=True,
- )
-
- now = int(self._clock.time_msec())
- if row and now < row["valid_until_ms"]:
- return json.loads(row["attestation_json"])
-
- return None
-
- def get_joined_groups(self, user_id):
- return self.db.simple_select_onecol(
- table="local_group_membership",
- keyvalues={"user_id": user_id, "membership": "join"},
- retcol="group_id",
- desc="get_joined_groups",
- )
-
- def get_all_groups_for_user(self, user_id, now_token):
- def _get_all_groups_for_user_txn(txn):
- sql = """
- SELECT group_id, type, membership, u.content
- FROM local_group_updates AS u
- INNER JOIN local_group_membership USING (group_id, user_id)
- WHERE user_id = ? AND membership != 'leave'
- AND stream_id <= ?
- """
- txn.execute(sql, (user_id, now_token))
- return [
- {
- "group_id": row[0],
- "type": row[1],
- "membership": row[2],
- "content": json.loads(row[3]),
- }
- for row in txn
- ]
-
- return self.db.runInteraction(
- "get_all_groups_for_user", _get_all_groups_for_user_txn
- )
-
- def get_groups_changes_for_user(self, user_id, from_token, to_token):
- from_token = int(from_token)
- has_changed = self._group_updates_stream_cache.has_entity_changed(
- user_id, from_token
- )
- if not has_changed:
- return defer.succeed([])
-
- def _get_groups_changes_for_user_txn(txn):
- sql = """
- SELECT group_id, membership, type, u.content
- FROM local_group_updates AS u
- INNER JOIN local_group_membership USING (group_id, user_id)
- WHERE user_id = ? AND ? < stream_id AND stream_id <= ?
- """
- txn.execute(sql, (user_id, from_token, to_token))
- return [
- {
- "group_id": group_id,
- "membership": membership,
- "type": gtype,
- "content": json.loads(content_json),
- }
- for group_id, membership, gtype, content_json in txn
- ]
-
- return self.db.runInteraction(
- "get_groups_changes_for_user", _get_groups_changes_for_user_txn
- )
-
- def get_all_groups_changes(self, from_token, to_token, limit):
- from_token = int(from_token)
- has_changed = self._group_updates_stream_cache.has_any_entity_changed(
- from_token
- )
- if not has_changed:
- return defer.succeed([])
-
- def _get_all_groups_changes_txn(txn):
- sql = """
- SELECT stream_id, group_id, user_id, type, content
- FROM local_group_updates
- WHERE ? < stream_id AND stream_id <= ?
- LIMIT ?
- """
- txn.execute(sql, (from_token, to_token, limit))
- return [
- (stream_id, group_id, user_id, gtype, json.loads(content_json))
- for stream_id, group_id, user_id, gtype, content_json in txn
- ]
-
- return self.db.runInteraction(
- "get_all_groups_changes", _get_all_groups_changes_txn
- )
-
def get_group_stream_token(self):
return self._group_updates_id_gen.get_current_token()
diff --git a/synapse/storage/data_stores/main/search.py b/synapse/storage/data_stores/main/search.py
index 47ebb8a214..2c50e04515 100644
--- a/synapse/storage/data_stores/main/search.py
+++ b/synapse/storage/data_stores/main/search.py
@@ -725,7 +725,7 @@ def _parse_query(database_engine, search_term):
results = re.findall(r"([\w\-]+)", search_term, re.UNICODE)
if isinstance(database_engine, PostgresEngine):
- return " & ".join(result + ":*" for result in results)
+ return " & ".join(result for result in results)
elif isinstance(database_engine, Sqlite3Engine):
return " & ".join(result + "*" for result in results)
else:
|