diff --git a/contrib/prometheus/consoles/synapse.html b/contrib/prometheus/consoles/synapse.html
index e23d8a1fce..69aa87f85e 100644
--- a/contrib/prometheus/consoles/synapse.html
+++ b/contrib/prometheus/consoles/synapse.html
@@ -202,11 +202,11 @@ new PromConsole.Graph({
<h1>Requests</h1>
<h3>Requests by Servlet</h3>
-<div id="synapse_http_server_requests_servlet"></div>
+<div id="synapse_http_server_request_count_servlet"></div>
<script>
new PromConsole.Graph({
- node: document.querySelector("#synapse_http_server_requests_servlet"),
- expr: "rate(synapse_http_server_requests:servlet[2m])",
+ node: document.querySelector("#synapse_http_server_request_count_servlet"),
+ expr: "rate(synapse_http_server_request_count:servlet[2m])",
name: "[[servlet]]",
yAxisFormatter: PromConsole.NumberFormatter.humanize,
yHoverFormatter: PromConsole.NumberFormatter.humanize,
@@ -215,11 +215,11 @@ new PromConsole.Graph({
})
</script>
<h4> (without <tt>EventStreamRestServlet</tt> or <tt>SyncRestServlet</tt>)</h4>
-<div id="synapse_http_server_requests_servlet_minus_events"></div>
+<div id="synapse_http_server_request_count_servlet_minus_events"></div>
<script>
new PromConsole.Graph({
- node: document.querySelector("#synapse_http_server_requests_servlet_minus_events"),
- expr: "rate(synapse_http_server_requests:servlet{servlet!=\"EventStreamRestServlet\", servlet!=\"SyncRestServlet\"}[2m])",
+ node: document.querySelector("#synapse_http_server_request_count_servlet_minus_events"),
+ expr: "rate(synapse_http_server_request_count:servlet{servlet!=\"EventStreamRestServlet\", servlet!=\"SyncRestServlet\"}[2m])",
name: "[[servlet]]",
yAxisFormatter: PromConsole.NumberFormatter.humanize,
yHoverFormatter: PromConsole.NumberFormatter.humanize,
@@ -233,7 +233,7 @@ new PromConsole.Graph({
<script>
new PromConsole.Graph({
node: document.querySelector("#synapse_http_server_response_time_avg"),
- expr: "rate(synapse_http_server_response_time:total[2m]) / rate(synapse_http_server_response_time:count[2m]) / 1000",
+ expr: "rate(synapse_http_server_response_time_seconds[2m]) / rate(synapse_http_server_response_count[2m]) / 1000",
name: "[[servlet]]",
yAxisFormatter: PromConsole.NumberFormatter.humanize,
yHoverFormatter: PromConsole.NumberFormatter.humanize,
@@ -276,7 +276,7 @@ new PromConsole.Graph({
<script>
new PromConsole.Graph({
node: document.querySelector("#synapse_http_server_response_ru_utime"),
- expr: "rate(synapse_http_server_response_ru_utime:total[2m])",
+ expr: "rate(synapse_http_server_response_ru_utime_seconds[2m])",
name: "[[servlet]]",
yAxisFormatter: PromConsole.NumberFormatter.humanize,
yHoverFormatter: PromConsole.NumberFormatter.humanize,
@@ -291,7 +291,7 @@ new PromConsole.Graph({
<script>
new PromConsole.Graph({
node: document.querySelector("#synapse_http_server_response_db_txn_duration"),
- expr: "rate(synapse_http_server_response_db_txn_duration:total[2m])",
+ expr: "rate(synapse_http_server_response_db_txn_duration_seconds[2m])",
name: "[[servlet]]",
yAxisFormatter: PromConsole.NumberFormatter.humanize,
yHoverFormatter: PromConsole.NumberFormatter.humanize,
@@ -306,7 +306,7 @@ new PromConsole.Graph({
<script>
new PromConsole.Graph({
node: document.querySelector("#synapse_http_server_send_time_avg"),
- expr: "rate(synapse_http_server_response_time:total{servlet='RoomSendEventRestServlet'}[2m]) / rate(synapse_http_server_response_time:count{servlet='RoomSendEventRestServlet'}[2m]) / 1000",
+ expr: "rate(synapse_http_server_response_time_second{servlet='RoomSendEventRestServlet'}[2m]) / rate(synapse_http_server_response_count{servlet='RoomSendEventRestServlet'}[2m]) / 1000",
name: "[[servlet]]",
yAxisFormatter: PromConsole.NumberFormatter.humanize,
yHoverFormatter: PromConsole.NumberFormatter.humanize,
diff --git a/contrib/prometheus/synapse-v1.rules b/contrib/prometheus/synapse-v1.rules
index b6f84174b0..4c900ba537 100644
--- a/contrib/prometheus/synapse-v1.rules
+++ b/contrib/prometheus/synapse-v1.rules
@@ -1,10 +1,10 @@
synapse_federation_transaction_queue_pendingEdus:total = sum(synapse_federation_transaction_queue_pendingEdus or absent(synapse_federation_transaction_queue_pendingEdus)*0)
synapse_federation_transaction_queue_pendingPdus:total = sum(synapse_federation_transaction_queue_pendingPdus or absent(synapse_federation_transaction_queue_pendingPdus)*0)
-synapse_http_server_requests:method{servlet=""} = sum(synapse_http_server_requests) by (method)
-synapse_http_server_requests:servlet{method=""} = sum(synapse_http_server_requests) by (servlet)
+synapse_http_server_request_count:method{servlet=""} = sum(synapse_http_server_request_count) by (method)
+synapse_http_server_request_count:servlet{method=""} = sum(synapse_http_server_request_count) by (servlet)
-synapse_http_server_requests:total{servlet=""} = sum(synapse_http_server_requests:by_method) by (servlet)
+synapse_http_server_request_count:total{servlet=""} = sum(synapse_http_server_request_count:by_method) by (servlet)
synapse_cache:hit_ratio_5m = rate(synapse_util_caches_cache:hits[5m]) / rate(synapse_util_caches_cache:total[5m])
synapse_cache:hit_ratio_30s = rate(synapse_util_caches_cache:hits[30s]) / rate(synapse_util_caches_cache:total[30s])
diff --git a/contrib/prometheus/synapse-v2.rules b/contrib/prometheus/synapse-v2.rules
index 07e37a885e..6ccca2daaf 100644
--- a/contrib/prometheus/synapse-v2.rules
+++ b/contrib/prometheus/synapse-v2.rules
@@ -5,19 +5,19 @@ groups:
expr: "sum(synapse_federation_transaction_queue_pendingEdus or absent(synapse_federation_transaction_queue_pendingEdus)*0)"
- record: "synapse_federation_transaction_queue_pendingPdus:total"
expr: "sum(synapse_federation_transaction_queue_pendingPdus or absent(synapse_federation_transaction_queue_pendingPdus)*0)"
- - record: 'synapse_http_server_requests:method'
+ - record: 'synapse_http_server_request_count:method'
labels:
servlet: ""
- expr: "sum(synapse_http_server_requests) by (method)"
- - record: 'synapse_http_server_requests:servlet'
+ expr: "sum(synapse_http_server_request_count) by (method)"
+ - record: 'synapse_http_server_request_count:servlet'
labels:
method: ""
- expr: 'sum(synapse_http_server_requests) by (servlet)'
+ expr: 'sum(synapse_http_server_request_count) by (servlet)'
- - record: 'synapse_http_server_requests:total'
+ - record: 'synapse_http_server_request_count:total'
labels:
servlet: ""
- expr: 'sum(synapse_http_server_requests:by_method) by (servlet)'
+ expr: 'sum(synapse_http_server_request_count:by_method) by (servlet)'
- record: 'synapse_cache:hit_ratio_5m'
expr: 'rate(synapse_util_caches_cache:hits[5m]) / rate(synapse_util_caches_cache:total[5m])'
diff --git a/docs/workers.rst b/docs/workers.rst
index 80f8d2181a..1d521b9ec5 100644
--- a/docs/workers.rst
+++ b/docs/workers.rst
@@ -55,7 +55,12 @@ synapse process.)
You then create a set of configs for the various worker processes. These
should be worker configuration files, and should be stored in a dedicated
-subdirectory, to allow synctl to manipulate them.
+subdirectory, to allow synctl to manipulate them. An additional configuration
+for the master synapse process will need to be created because the process will
+not be started automatically. That configuration should look like this::
+
+ worker_app: synapse.app.homeserver
+ daemonize: true
Each worker configuration file inherits the configuration of the main homeserver
configuration file. You can then override configuration specific to that worker,
@@ -230,9 +235,11 @@ file. For example::
``synapse.app.event_creator``
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-Handles non-state event creation. It can handle REST endpoints matching::
+Handles some event creation. It can handle REST endpoints matching::
^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/send
+ ^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/(join|invite|leave|ban|unban|kick)$
+ ^/_matrix/client/(api/v1|r0|unstable)/join/
It will create events locally and then send them on to the main synapse
instance to be persisted and handled.
diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db
index d46581e4e1..7b23a44854 100755
--- a/scripts/synapse_port_db
+++ b/scripts/synapse_port_db
@@ -1,6 +1,7 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Copyright 2015, 2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -250,6 +251,12 @@ class Porter(object):
@defer.inlineCallbacks
def handle_table(self, table, postgres_size, table_size, forward_chunk,
backward_chunk):
+ logger.info(
+ "Table %s: %i/%i (rows %i-%i) already ported",
+ table, postgres_size, table_size,
+ backward_chunk+1, forward_chunk-1,
+ )
+
if not table_size:
return
@@ -467,31 +474,10 @@ class Porter(object):
self.progress.set_state("Preparing PostgreSQL")
self.setup_db(postgres_config, postgres_engine)
- # Step 2. Get tables.
- self.progress.set_state("Fetching tables")
- sqlite_tables = yield self.sqlite_store._simple_select_onecol(
- table="sqlite_master",
- keyvalues={
- "type": "table",
- },
- retcol="name",
- )
-
- postgres_tables = yield self.postgres_store._simple_select_onecol(
- table="information_schema.tables",
- keyvalues={},
- retcol="distinct table_name",
- )
-
- tables = set(sqlite_tables) & set(postgres_tables)
-
- self.progress.set_state("Creating tables")
-
- logger.info("Found %d tables", len(tables))
-
+ self.progress.set_state("Creating port tables")
def create_port_table(txn):
txn.execute(
- "CREATE TABLE port_from_sqlite3 ("
+ "CREATE TABLE IF NOT EXISTS port_from_sqlite3 ("
" table_name varchar(100) NOT NULL UNIQUE,"
" forward_rowid bigint NOT NULL,"
" backward_rowid bigint NOT NULL"
@@ -517,18 +503,33 @@ class Porter(object):
"alter_table", alter_table
)
except Exception as e:
- logger.info("Failed to create port table: %s", e)
+ pass
- try:
- yield self.postgres_store.runInteraction(
- "create_port_table", create_port_table
- )
- except Exception as e:
- logger.info("Failed to create port table: %s", e)
+ yield self.postgres_store.runInteraction(
+ "create_port_table", create_port_table
+ )
+
+ # Step 2. Get tables.
+ self.progress.set_state("Fetching tables")
+ sqlite_tables = yield self.sqlite_store._simple_select_onecol(
+ table="sqlite_master",
+ keyvalues={
+ "type": "table",
+ },
+ retcol="name",
+ )
- self.progress.set_state("Setting up")
+ postgres_tables = yield self.postgres_store._simple_select_onecol(
+ table="information_schema.tables",
+ keyvalues={},
+ retcol="distinct table_name",
+ )
- # Set up tables.
+ tables = set(sqlite_tables) & set(postgres_tables)
+ logger.info("Found %d tables", len(tables))
+
+ # Step 3. Figure out what still needs copying
+ self.progress.set_state("Checking on port progress")
setup_res = yield defer.gatherResults(
[
self.setup_table(table)
@@ -539,7 +540,8 @@ class Porter(object):
consumeErrors=True,
)
- # Process tables.
+ # Step 4. Do the copying.
+ self.progress.set_state("Copying to postgres")
yield defer.gatherResults(
[
self.handle_table(*res)
@@ -548,6 +550,9 @@ class Porter(object):
consumeErrors=True,
)
+ # Step 5. Do final post-processing
+ yield self._setup_state_group_id_seq()
+
self.progress.done()
except:
global end_error_exec_info
@@ -707,6 +712,16 @@ class Porter(object):
defer.returnValue((done, remaining + done))
+ def _setup_state_group_id_seq(self):
+ def r(txn):
+ txn.execute("SELECT MAX(id) FROM state_groups")
+ next_id = txn.fetchone()[0]+1
+ txn.execute(
+ "ALTER SEQUENCE state_group_id_seq RESTART WITH %s",
+ (next_id,),
+ )
+ return self.postgres_store.runInteraction("setup_state_group_id_seq", r)
+
##############################################
###### The following is simply UI stuff ######
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index b935beb974..a0e465d644 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -48,6 +48,7 @@ from synapse.server import HomeServer
from synapse.storage import are_all_users_on_domain
from synapse.storage.engines import IncorrectDatabaseSetup, create_engine
from synapse.storage.prepare_database import UpgradeDatabaseException, prepare_database
+from synapse.util.caches import CACHE_SIZE_FACTOR
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext
from synapse.util.manhole import manhole
@@ -429,8 +430,14 @@ def run(hs):
stats["daily_active_rooms"] = yield hs.get_datastore().count_daily_active_rooms()
stats["daily_messages"] = yield hs.get_datastore().count_daily_messages()
+ r30_results = yield hs.get_datastore().count_r30_users()
+ for name, count in r30_results.iteritems():
+ stats["r30_users_" + name] = count
+
daily_sent_messages = yield hs.get_datastore().count_daily_sent_messages()
stats["daily_sent_messages"] = daily_sent_messages
+ stats["cache_factor"] = CACHE_SIZE_FACTOR
+ stats["event_cache_size"] = hs.config.event_cache_size
if len(stats_process) > 0:
stats["memory_rss"] = 0
diff --git a/synapse/app/synctl.py b/synapse/app/synctl.py
index b0e1b5e66a..712dfa870e 100755
--- a/synapse/app/synctl.py
+++ b/synapse/app/synctl.py
@@ -252,6 +252,7 @@ def main():
for running_pid in running_pids:
while pid_running(running_pid):
time.sleep(0.2)
+ write("All processes exited; now restarting...")
if action == "start" or action == "restart":
if start_stop_synapse:
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index 5488e82985..50a967a7ec 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -614,6 +615,19 @@ class TransportLayerClient(object):
)
@log_function
+ def join_group(self, destination, group_id, user_id, content):
+ """Attempts to join a group
+ """
+ path = PREFIX + "/groups/%s/users/%s/join" % (group_id, user_id)
+
+ return self.client.post_json(
+ destination=destination,
+ path=path,
+ data=content,
+ ignore_backoff=True,
+ )
+
+ @log_function
def invite_to_group(self, destination, group_id, user_id, requester_user_id, content):
"""Invite a user to a group
"""
@@ -857,6 +871,21 @@ class TransportLayerClient(object):
)
@log_function
+ def set_group_join_policy(self, destination, group_id, requester_user_id,
+ content):
+ """Sets the join policy for a group
+ """
+ path = PREFIX + "/groups/%s/settings/m.join_policy" % (group_id,)
+
+ return self.client.put_json(
+ destination=destination,
+ path=path,
+ args={"requester_user_id": requester_user_id},
+ data=content,
+ ignore_backoff=True,
+ )
+
+ @log_function
def delete_group_summary_user(self, destination, group_id, requester_user_id,
user_id, role_id):
"""Delete a users entry in a group
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index a66a6b0692..4c94d5a36c 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -802,6 +803,23 @@ class FederationGroupsAcceptInviteServlet(BaseFederationServlet):
defer.returnValue((200, new_content))
+class FederationGroupsJoinServlet(BaseFederationServlet):
+ """Attempt to join a group
+ """
+ PATH = "/groups/(?P<group_id>[^/]*)/users/(?P<user_id>[^/]*)/join$"
+
+ @defer.inlineCallbacks
+ def on_POST(self, origin, content, query, group_id, user_id):
+ if get_domain_from_id(user_id) != origin:
+ raise SynapseError(403, "user_id doesn't match origin")
+
+ new_content = yield self.handler.join_group(
+ group_id, user_id, content,
+ )
+
+ defer.returnValue((200, new_content))
+
+
class FederationGroupsRemoveUserServlet(BaseFederationServlet):
"""Leave or kick a user from the group
"""
@@ -1124,6 +1142,24 @@ class FederationGroupsBulkPublicisedServlet(BaseFederationServlet):
defer.returnValue((200, resp))
+class FederationGroupsSettingJoinPolicyServlet(BaseFederationServlet):
+ """Sets whether a group is joinable without an invite or knock
+ """
+ PATH = "/groups/(?P<group_id>[^/]*)/settings/m.join_policy$"
+
+ @defer.inlineCallbacks
+ def on_PUT(self, origin, content, query, group_id):
+ requester_user_id = parse_string_from_args(query, "requester_user_id")
+ if get_domain_from_id(requester_user_id) != origin:
+ raise SynapseError(403, "requester_user_id doesn't match origin")
+
+ new_content = yield self.handler.set_group_join_policy(
+ group_id, requester_user_id, content
+ )
+
+ defer.returnValue((200, new_content))
+
+
FEDERATION_SERVLET_CLASSES = (
FederationSendServlet,
FederationPullServlet,
@@ -1163,6 +1199,7 @@ GROUP_SERVER_SERVLET_CLASSES = (
FederationGroupsInvitedUsersServlet,
FederationGroupsInviteServlet,
FederationGroupsAcceptInviteServlet,
+ FederationGroupsJoinServlet,
FederationGroupsRemoveUserServlet,
FederationGroupsSummaryRoomsServlet,
FederationGroupsCategoriesServlet,
@@ -1172,6 +1209,7 @@ GROUP_SERVER_SERVLET_CLASSES = (
FederationGroupsSummaryUsersServlet,
FederationGroupsAddRoomsServlet,
FederationGroupsAddRoomsConfigServlet,
+ FederationGroupsSettingJoinPolicyServlet,
)
diff --git a/synapse/groups/groups_server.py b/synapse/groups/groups_server.py
index 0b995aed70..2d95b04e0c 100644
--- a/synapse/groups/groups_server.py
+++ b/synapse/groups/groups_server.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2017 Vector Creations Ltd
+# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -206,6 +207,28 @@ class GroupsServerHandler(object):
defer.returnValue({})
@defer.inlineCallbacks
+ def set_group_join_policy(self, group_id, requester_user_id, content):
+ """Sets the group join policy.
+
+ Currently supported policies are:
+ - "invite": an invite must be received and accepted in order to join.
+ - "open": anyone can join.
+ """
+ yield self.check_group_is_ours(
+ group_id, requester_user_id, and_exists=True, and_is_admin=requester_user_id
+ )
+
+ join_policy = _parse_join_policy_from_contents(content)
+ if join_policy is None:
+ raise SynapseError(
+ 400, "No value specified for 'm.join_policy'"
+ )
+
+ yield self.store.set_group_join_policy(group_id, join_policy=join_policy)
+
+ defer.returnValue({})
+
+ @defer.inlineCallbacks
def get_group_categories(self, group_id, requester_user_id):
"""Get all categories in a group (as seen by user)
"""
@@ -381,9 +404,16 @@ class GroupsServerHandler(object):
yield self.check_group_is_ours(group_id, requester_user_id)
- group_description = yield self.store.get_group(group_id)
+ group = yield self.store.get_group(group_id)
+
+ if group:
+ cols = [
+ "name", "short_description", "long_description",
+ "avatar_url", "is_public",
+ ]
+ group_description = {key: group[key] for key in cols}
+ group_description["is_openly_joinable"] = group["join_policy"] == "open"
- if group_description:
defer.returnValue(group_description)
else:
raise SynapseError(404, "Unknown group")
@@ -655,30 +685,21 @@ class GroupsServerHandler(object):
raise SynapseError(502, "Unknown state returned by HS")
@defer.inlineCallbacks
- def accept_invite(self, group_id, requester_user_id, content):
- """User tries to accept an invite to the group.
+ def _add_user(self, group_id, user_id, content):
+ """Add a user to a group based on a content dict.
- This is different from them asking to join, and so should error if no
- invite exists (and they're not a member of the group)
+ See accept_invite, join_group.
"""
-
- yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True)
-
- is_invited = yield self.store.is_user_invited_to_local_group(
- group_id, requester_user_id,
- )
- if not is_invited:
- raise SynapseError(403, "User not invited to group")
-
- if not self.hs.is_mine_id(requester_user_id):
+ if not self.hs.is_mine_id(user_id):
local_attestation = self.attestations.create_attestation(
- group_id, requester_user_id,
+ group_id, user_id,
)
+
remote_attestation = content["attestation"]
yield self.attestations.verify_attestation(
remote_attestation,
- user_id=requester_user_id,
+ user_id=user_id,
group_id=group_id,
)
else:
@@ -688,13 +709,53 @@ class GroupsServerHandler(object):
is_public = _parse_visibility_from_contents(content)
yield self.store.add_user_to_group(
- group_id, requester_user_id,
+ group_id, user_id,
is_admin=False,
is_public=is_public,
local_attestation=local_attestation,
remote_attestation=remote_attestation,
)
+ defer.returnValue(local_attestation)
+
+ @defer.inlineCallbacks
+ def accept_invite(self, group_id, requester_user_id, content):
+ """User tries to accept an invite to the group.
+
+ This is different from them asking to join, and so should error if no
+ invite exists (and they're not a member of the group)
+ """
+
+ yield self.check_group_is_ours(group_id, requester_user_id, and_exists=True)
+
+ is_invited = yield self.store.is_user_invited_to_local_group(
+ group_id, requester_user_id,
+ )
+ if not is_invited:
+ raise SynapseError(403, "User not invited to group")
+
+ local_attestation = yield self._add_user(group_id, requester_user_id, content)
+
+ defer.returnValue({
+ "state": "join",
+ "attestation": local_attestation,
+ })
+
+ @defer.inlineCallbacks
+ def join_group(self, group_id, requester_user_id, content):
+ """User tries to join the group.
+
+ This will error if the group requires an invite/knock to join
+ """
+
+ group_info = yield self.check_group_is_ours(
+ group_id, requester_user_id, and_exists=True
+ )
+ if group_info['join_policy'] != "open":
+ raise SynapseError(403, "Group is not publicly joinable")
+
+ local_attestation = yield self._add_user(group_id, requester_user_id, content)
+
defer.returnValue({
"state": "join",
"attestation": local_attestation,
@@ -835,6 +896,31 @@ class GroupsServerHandler(object):
})
+def _parse_join_policy_from_contents(content):
+ """Given a content for a request, return the specified join policy or None
+ """
+
+ join_policy_dict = content.get("m.join_policy")
+ if join_policy_dict:
+ return _parse_join_policy_dict(join_policy_dict)
+ else:
+ return None
+
+
+def _parse_join_policy_dict(join_policy_dict):
+ """Given a dict for the "m.join_policy" config return the join policy specified
+ """
+ join_policy_type = join_policy_dict.get("type")
+ if not join_policy_type:
+ return "invite"
+
+ if join_policy_type not in ("invite", "open"):
+ raise SynapseError(
+ 400, "Synapse only supports 'invite'/'open' join rule"
+ )
+ return join_policy_type
+
+
def _parse_visibility_from_contents(content):
"""Given a content for a request parse out whether the entity should be
public or not
diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py
index e4d0cc8b02..977993e7d4 100644
--- a/synapse/handlers/groups_local.py
+++ b/synapse/handlers/groups_local.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2017 Vector Creations Ltd
+# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -90,6 +91,8 @@ class GroupsLocalHandler(object):
get_group_role = _create_rerouter("get_group_role")
get_group_roles = _create_rerouter("get_group_roles")
+ set_group_join_policy = _create_rerouter("set_group_join_policy")
+
@defer.inlineCallbacks
def get_group_summary(self, group_id, requester_user_id):
"""Get the group summary for a group.
@@ -226,7 +229,45 @@ class GroupsLocalHandler(object):
def join_group(self, group_id, user_id, content):
"""Request to join a group
"""
- raise NotImplementedError() # TODO
+ if self.is_mine_id(group_id):
+ yield self.groups_server_handler.join_group(
+ group_id, user_id, content
+ )
+ local_attestation = None
+ remote_attestation = None
+ else:
+ local_attestation = self.attestations.create_attestation(group_id, user_id)
+ content["attestation"] = local_attestation
+
+ res = yield self.transport_client.join_group(
+ get_domain_from_id(group_id), group_id, user_id, content,
+ )
+
+ remote_attestation = res["attestation"]
+
+ yield self.attestations.verify_attestation(
+ remote_attestation,
+ group_id=group_id,
+ user_id=user_id,
+ server_name=get_domain_from_id(group_id),
+ )
+
+ # TODO: Check that the group is public and we're being added publically
+ is_publicised = content.get("publicise", False)
+
+ token = yield self.store.register_user_group_membership(
+ group_id, user_id,
+ membership="join",
+ is_admin=False,
+ local_attestation=local_attestation,
+ remote_attestation=remote_attestation,
+ is_publicised=is_publicised,
+ )
+ self.notifier.on_new_event(
+ "groups_key", token, users=[user_id],
+ )
+
+ defer.returnValue({})
@defer.inlineCallbacks
def accept_invite(self, group_id, user_id, content):
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 9977be8831..c45142d38d 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -852,6 +852,14 @@ class RoomMemberMasterHandler(RoomMemberHandler):
def _remote_join(self, requester, remote_room_hosts, room_id, user, content):
"""Implements RoomMemberHandler._remote_join
"""
+ # filter ourselves out of remote_room_hosts: do_invite_join ignores it
+ # and if it is the only entry we'd like to return a 404 rather than a
+ # 500.
+
+ remote_room_hosts = [
+ host for host in remote_room_hosts if host != self.hs.hostname
+ ]
+
if len(remote_room_hosts) == 0:
raise SynapseError(404, "No known servers")
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 9145405cb0..60a29081e8 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -286,7 +286,8 @@ class MatrixFederationHttpClient(object):
headers_dict[b"Authorization"] = auth_headers
@defer.inlineCallbacks
- def put_json(self, destination, path, data={}, json_data_callback=None,
+ def put_json(self, destination, path, args={}, data={},
+ json_data_callback=None,
long_retries=False, timeout=None,
ignore_backoff=False,
backoff_on_404=False):
@@ -296,6 +297,7 @@ class MatrixFederationHttpClient(object):
destination (str): The remote server to send the HTTP request
to.
path (str): The HTTP path.
+ args (dict): query params
data (dict): A dict containing the data that will be used as
the request body. This will be encoded as JSON.
json_data_callback (callable): A callable returning the dict to
@@ -342,6 +344,7 @@ class MatrixFederationHttpClient(object):
path,
body_callback=body_callback,
headers_dict={"Content-Type": ["application/json"]},
+ query_bytes=encode_query_args(args),
long_retries=long_retries,
timeout=timeout,
ignore_backoff=ignore_backoff,
@@ -373,6 +376,7 @@ class MatrixFederationHttpClient(object):
giving up. None indicates no timeout.
ignore_backoff (bool): true to ignore the historical backoff data and
try the request anyway.
+ args (dict): query params
Returns:
Deferred: Succeeds when we get a 2xx HTTP response. The result
will be the decoded JSON body.
diff --git a/synapse/http/server.py b/synapse/http/server.py
index f19c068ef6..64e083ebfc 100644
--- a/synapse/http/server.py
+++ b/synapse/http/server.py
@@ -113,6 +113,11 @@ response_db_sched_duration = metrics.register_counter(
"response_db_sched_duration_seconds", labels=["method", "servlet", "tag"]
)
+# size in bytes of the response written
+response_size = metrics.register_counter(
+ "response_size", labels=["method", "servlet", "tag"]
+)
+
_next_request_id = 0
@@ -426,6 +431,8 @@ class RequestMetrics(object):
context.db_sched_duration_ms / 1000., request.method, self.name, tag
)
+ response_size.inc_by(request.sentLength, request.method, self.name, tag)
+
class RootRedirect(resource.Resource):
"""Redirects the root '/' path to another path."""
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index 70d788deea..d06cbdc35e 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -655,7 +655,12 @@ class RoomMembershipRestServlet(ClientV1RestServlet):
content=event_content,
)
- defer.returnValue((200, {}))
+ return_value = {}
+
+ if membership_action == "join":
+ return_value["room_id"] = room_id
+
+ defer.returnValue((200, return_value))
def _has_3pid_invite_keys(self, content):
for key in {"id_server", "medium", "address"}:
diff --git a/synapse/rest/client/v2_alpha/groups.py b/synapse/rest/client/v2_alpha/groups.py
index f762dbfa9a..3bb1ec2af6 100644
--- a/synapse/rest/client/v2_alpha/groups.py
+++ b/synapse/rest/client/v2_alpha/groups.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2017 Vector Creations Ltd
+# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -401,6 +402,32 @@ class GroupInvitedUsersServlet(RestServlet):
defer.returnValue((200, result))
+class GroupSettingJoinPolicyServlet(RestServlet):
+ """Set group join policy
+ """
+ PATTERNS = client_v2_patterns("/groups/(?P<group_id>[^/]*)/settings/m.join_policy$")
+
+ def __init__(self, hs):
+ super(GroupSettingJoinPolicyServlet, self).__init__()
+ self.auth = hs.get_auth()
+ self.groups_handler = hs.get_groups_local_handler()
+
+ @defer.inlineCallbacks
+ def on_PUT(self, request, group_id):
+ requester = yield self.auth.get_user_by_req(request)
+ requester_user_id = requester.user.to_string()
+
+ content = parse_json_object_from_request(request)
+
+ result = yield self.groups_handler.set_group_join_policy(
+ group_id,
+ requester_user_id,
+ content,
+ )
+
+ defer.returnValue((200, result))
+
+
class GroupCreateServlet(RestServlet):
"""Create a group
"""
@@ -738,6 +765,7 @@ def register_servlets(hs, http_server):
GroupInvitedUsersServlet(hs).register(http_server)
GroupUsersServlet(hs).register(http_server)
GroupRoomServlet(hs).register(http_server)
+ GroupSettingJoinPolicyServlet(hs).register(http_server)
GroupCreateServlet(hs).register(http_server)
GroupAdminRoomsServlet(hs).register(http_server)
GroupAdminRoomsConfigServlet(hs).register(http_server)
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index de00cae447..4800584b59 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -14,8 +14,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from twisted.internet import defer
-
from synapse.storage.devices import DeviceStore
from .appservice import (
ApplicationServiceStore, ApplicationServiceTransactionStore
@@ -244,13 +242,12 @@ class DataStore(RoomMemberStore, RoomStore,
return [UserPresenceState(**row) for row in rows]
- @defer.inlineCallbacks
def count_daily_users(self):
"""
Counts the number of users who used this homeserver in the last 24 hours.
"""
def _count_users(txn):
- yesterday = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24),
+ yesterday = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24)
sql = """
SELECT COALESCE(count(*), 0) FROM (
@@ -264,8 +261,91 @@ class DataStore(RoomMemberStore, RoomStore,
count, = txn.fetchone()
return count
- ret = yield self.runInteraction("count_users", _count_users)
- defer.returnValue(ret)
+ return self.runInteraction("count_users", _count_users)
+
+ def count_r30_users(self):
+ """
+ Counts the number of 30 day retained users, defined as:-
+ * Users who have created their accounts more than 30 days
+ * Where last seen at most 30 days ago
+ * Where account creation and last_seen are > 30 days
+
+ Returns counts globaly for a given user as well as breaking
+ by platform
+ """
+ def _count_r30_users(txn):
+ thirty_days_in_secs = 86400 * 30
+ now = int(self._clock.time_msec())
+ thirty_days_ago_in_secs = now - thirty_days_in_secs
+
+ sql = """
+ SELECT platform, COALESCE(count(*), 0) FROM (
+ SELECT
+ users.name, platform, users.creation_ts * 1000,
+ MAX(uip.last_seen)
+ FROM users
+ INNER JOIN (
+ SELECT
+ user_id,
+ last_seen,
+ CASE
+ WHEN user_agent LIKE '%Android%' THEN 'android'
+ WHEN user_agent LIKE '%iOS%' THEN 'ios'
+ WHEN user_agent LIKE '%Electron%' THEN 'electron'
+ WHEN user_agent LIKE '%Mozilla%' THEN 'web'
+ WHEN user_agent LIKE '%Gecko%' THEN 'web'
+ ELSE 'unknown'
+ END
+ AS platform
+ FROM user_ips
+ ) uip
+ ON users.name = uip.user_id
+ AND users.appservice_id is NULL
+ AND users.creation_ts < ?
+ AND uip.last_seen/1000 > ?
+ AND (uip.last_seen/1000) - users.creation_ts > 86400 * 30
+ GROUP BY users.name, platform, users.creation_ts
+ ) u GROUP BY platform
+ """
+
+ results = {}
+ txn.execute(sql, (thirty_days_ago_in_secs,
+ thirty_days_ago_in_secs))
+
+ for row in txn:
+ if row[0] is 'unknown':
+ pass
+ results[row[0]] = row[1]
+
+ sql = """
+ SELECT COALESCE(count(*), 0) FROM (
+ SELECT users.name, users.creation_ts * 1000,
+ MAX(uip.last_seen)
+ FROM users
+ INNER JOIN (
+ SELECT
+ user_id,
+ last_seen
+ FROM user_ips
+ ) uip
+ ON users.name = uip.user_id
+ AND appservice_id is NULL
+ AND users.creation_ts < ?
+ AND uip.last_seen/1000 > ?
+ AND (uip.last_seen/1000) - users.creation_ts > 86400 * 30
+ GROUP BY users.name, users.creation_ts
+ ) u
+ """
+
+ txn.execute(sql, (thirty_days_ago_in_secs,
+ thirty_days_ago_in_secs))
+
+ count, = txn.fetchone()
+ results['all'] = count
+
+ return results
+
+ return self.runInteraction("count_r30_users", _count_r30_users)
def get_users(self):
"""Function to reterive a list of users in users table.
diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py
index a03d1d6104..7b44dae0fc 100644
--- a/synapse/storage/client_ips.py
+++ b/synapse/storage/client_ips.py
@@ -48,6 +48,13 @@ class ClientIpStore(background_updates.BackgroundUpdateStore):
columns=["user_id", "device_id", "last_seen"],
)
+ self.register_background_index_update(
+ "user_ips_last_seen_index",
+ index_name="user_ips_last_seen",
+ table="user_ips",
+ columns=["user_id", "last_seen"],
+ )
+
# (user_id, access_token, ip) -> (user_agent, device_id, last_seen)
self._batch_row_update = {}
diff --git a/synapse/storage/group_server.py b/synapse/storage/group_server.py
index d03858234b..da05ccb027 100644
--- a/synapse/storage/group_server.py
+++ b/synapse/storage/group_server.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2017 Vector Creations Ltd
+# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -29,6 +30,24 @@ _DEFAULT_ROLE_ID = ""
class GroupServerStore(SQLBaseStore):
+ def set_group_join_policy(self, group_id, join_policy):
+ """Set the join policy of a group.
+
+ join_policy can be one of:
+ * "invite"
+ * "open"
+ """
+ return self._simple_update_one(
+ table="groups",
+ keyvalues={
+ "group_id": group_id,
+ },
+ updatevalues={
+ "join_policy": join_policy,
+ },
+ desc="set_group_join_policy",
+ )
+
def get_group(self, group_id):
return self._simple_select_one(
table="groups",
@@ -36,10 +55,11 @@ class GroupServerStore(SQLBaseStore):
"group_id": group_id,
},
retcols=(
- "name", "short_description", "long_description", "avatar_url", "is_public"
+ "name", "short_description", "long_description",
+ "avatar_url", "is_public", "join_policy",
),
allow_none=True,
- desc="is_user_in_group",
+ desc="get_group",
)
def get_users_in_group(self, group_id, include_private=False):
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index c845a0cec5..04411a665f 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2014 - 2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -25,7 +26,7 @@ logger = logging.getLogger(__name__)
# Remember to update this number every time a change is made to database
# schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 47
+SCHEMA_VERSION = 48
dir_path = os.path.abspath(os.path.dirname(__file__))
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index 908551d6d9..740c036975 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -594,7 +594,8 @@ class RoomStore(RoomWorkerStore, SearchStore):
while next_token:
sql = """
- SELECT stream_ordering, content FROM events
+ SELECT stream_ordering, json FROM events
+ JOIN event_json USING (event_id)
WHERE room_id = ?
AND stream_ordering < ?
AND contains_url = ? AND outlier = ?
@@ -606,8 +607,8 @@ class RoomStore(RoomWorkerStore, SearchStore):
next_token = None
for stream_ordering, content_json in txn:
next_token = stream_ordering
- content = json.loads(content_json)
-
+ event_json = json.loads(content_json)
+ content = event_json["content"]
content_url = content.get("url")
thumbnail_url = content.get("info", {}).get("thumbnail_url")
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index d662d1cfc0..6a861943a2 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -645,8 +645,9 @@ class RoomMemberStore(RoomMemberWorkerStore):
def add_membership_profile_txn(txn):
sql = ("""
- SELECT stream_ordering, event_id, events.room_id, content
+ SELECT stream_ordering, event_id, events.room_id, event_json.json
FROM events
+ INNER JOIN event_json USING (event_id)
INNER JOIN room_memberships USING (event_id)
WHERE ? <= stream_ordering AND stream_ordering < ?
AND type = 'm.room.member'
@@ -667,7 +668,8 @@ class RoomMemberStore(RoomMemberWorkerStore):
event_id = row["event_id"]
room_id = row["room_id"]
try:
- content = json.loads(row["content"])
+ event_json = json.loads(row["json"])
+ content = event_json['content']
except Exception:
continue
diff --git a/synapse/storage/schema/delta/48/add_user_ips_last_seen_index.sql b/synapse/storage/schema/delta/48/add_user_ips_last_seen_index.sql
new file mode 100644
index 0000000000..9248b0b24a
--- /dev/null
+++ b/synapse/storage/schema/delta/48/add_user_ips_last_seen_index.sql
@@ -0,0 +1,17 @@
+/* Copyright 2018 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+INSERT into background_updates (update_name, progress_json)
+ VALUES ('user_ips_last_seen_index', '{}');
diff --git a/synapse/storage/schema/delta/48/groups_joinable.sql b/synapse/storage/schema/delta/48/groups_joinable.sql
new file mode 100644
index 0000000000..ce26eaf0c9
--- /dev/null
+++ b/synapse/storage/schema/delta/48/groups_joinable.sql
@@ -0,0 +1,22 @@
+/* Copyright 2018 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * This isn't a real ENUM because sqlite doesn't support it
+ * and we use a default of NULL for inserted rows and interpret
+ * NULL at the python store level as necessary so that existing
+ * rows are given the correct default policy.
+ */
+ALTER TABLE groups ADD COLUMN join_policy TEXT NOT NULL DEFAULT 'invite';
diff --git a/synapse/storage/search.py b/synapse/storage/search.py
index 984643b057..426cbe6e1a 100644
--- a/synapse/storage/search.py
+++ b/synapse/storage/search.py
@@ -75,8 +75,9 @@ class SearchStore(BackgroundUpdateStore):
def reindex_search_txn(txn):
sql = (
- "SELECT stream_ordering, event_id, room_id, type, content, "
+ "SELECT stream_ordering, event_id, room_id, type, json, "
" origin_server_ts FROM events"
+ " JOIN event_json USING (event_id)"
" WHERE ? <= stream_ordering AND stream_ordering < ?"
" AND (%s)"
" ORDER BY stream_ordering DESC"
@@ -104,7 +105,8 @@ class SearchStore(BackgroundUpdateStore):
stream_ordering = row["stream_ordering"]
origin_server_ts = row["origin_server_ts"]
try:
- content = json.loads(row["content"])
+ event_json = json.loads(row["json"])
+ content = event_json["content"]
except Exception:
continue
diff --git a/synapse/types.py b/synapse/types.py
index f1f41ccf90..7cb24cecb2 100644
--- a/synapse/types.py
+++ b/synapse/types.py
@@ -12,11 +12,11 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+import string
from synapse.api.errors import SynapseError
from collections import namedtuple
-import re
class Requester(namedtuple("Requester", [
@@ -214,8 +214,7 @@ class GroupID(DomainSpecificString):
return group_id
-# A regex that matches any valid mxid characters
-MXID_LOCALPART_REGEX = re.compile("^[_\-./=a-z0-9]*$")
+mxid_localpart_allowed_characters = set("_-./=" + string.ascii_lowercase + string.digits)
def contains_invalid_mxid_characters(localpart):
@@ -227,7 +226,7 @@ def contains_invalid_mxid_characters(localpart):
Returns:
bool: True if there are any naughty characters
"""
- return not MXID_LOCALPART_REGEX.match(localpart)
+ return any(c not in mxid_localpart_allowed_characters for c in localpart)
class StreamToken(
diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py
index bf3a66eae4..68285a7594 100644
--- a/synapse/util/caches/descriptors.py
+++ b/synapse/util/caches/descriptors.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2015, 2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -39,12 +40,11 @@ _CacheSentinel = object()
class CacheEntry(object):
__slots__ = [
- "deferred", "sequence", "callbacks", "invalidated"
+ "deferred", "callbacks", "invalidated"
]
- def __init__(self, deferred, sequence, callbacks):
+ def __init__(self, deferred, callbacks):
self.deferred = deferred
- self.sequence = sequence
self.callbacks = set(callbacks)
self.invalidated = False
@@ -62,7 +62,6 @@ class Cache(object):
"max_entries",
"name",
"keylen",
- "sequence",
"thread",
"metrics",
"_pending_deferred_cache",
@@ -80,7 +79,6 @@ class Cache(object):
self.name = name
self.keylen = keylen
- self.sequence = 0
self.thread = None
self.metrics = register_cache(name, self.cache)
@@ -113,11 +111,10 @@ class Cache(object):
callbacks = [callback] if callback else []
val = self._pending_deferred_cache.get(key, _CacheSentinel)
if val is not _CacheSentinel:
- if val.sequence == self.sequence:
- val.callbacks.update(callbacks)
- if update_metrics:
- self.metrics.inc_hits()
- return val.deferred
+ val.callbacks.update(callbacks)
+ if update_metrics:
+ self.metrics.inc_hits()
+ return val.deferred
val = self.cache.get(key, _CacheSentinel, callbacks=callbacks)
if val is not _CacheSentinel:
@@ -137,12 +134,9 @@ class Cache(object):
self.check_thread()
entry = CacheEntry(
deferred=value,
- sequence=self.sequence,
callbacks=callbacks,
)
- entry.callbacks.update(callbacks)
-
existing_entry = self._pending_deferred_cache.pop(key, None)
if existing_entry:
existing_entry.invalidate()
@@ -150,13 +144,25 @@ class Cache(object):
self._pending_deferred_cache[key] = entry
def shuffle(result):
- if self.sequence == entry.sequence:
- existing_entry = self._pending_deferred_cache.pop(key, None)
- if existing_entry is entry:
- self.cache.set(key, result, entry.callbacks)
- else:
- entry.invalidate()
+ existing_entry = self._pending_deferred_cache.pop(key, None)
+ if existing_entry is entry:
+ self.cache.set(key, result, entry.callbacks)
else:
+ # oops, the _pending_deferred_cache has been updated since
+ # we started our query, so we are out of date.
+ #
+ # Better put back whatever we took out. (We do it this way
+ # round, rather than peeking into the _pending_deferred_cache
+ # and then removing on a match, to make the common case faster)
+ if existing_entry is not None:
+ self._pending_deferred_cache[key] = existing_entry
+
+ # we're not going to put this entry into the cache, so need
+ # to make sure that the invalidation callbacks are called.
+ # That was probably done when _pending_deferred_cache was
+ # updated, but it's possible that `set` was called without
+ # `invalidate` being previously called, in which case it may
+ # not have been. Either way, let's double-check now.
entry.invalidate()
return result
@@ -168,25 +174,29 @@ class Cache(object):
def invalidate(self, key):
self.check_thread()
+ self.cache.pop(key, None)
- # Increment the sequence number so that any SELECT statements that
- # raced with the INSERT don't update the cache (SYN-369)
- self.sequence += 1
+ # if we have a pending lookup for this key, remove it from the
+ # _pending_deferred_cache, which will (a) stop it being returned
+ # for future queries and (b) stop it being persisted as a proper entry
+ # in self.cache.
entry = self._pending_deferred_cache.pop(key, None)
+
+ # run the invalidation callbacks now, rather than waiting for the
+ # deferred to resolve.
if entry:
entry.invalidate()
- self.cache.pop(key, None)
-
def invalidate_many(self, key):
self.check_thread()
if not isinstance(key, tuple):
raise TypeError(
"The cache key must be a tuple not %r" % (type(key),)
)
- self.sequence += 1
self.cache.del_multi(key)
+ # if we have a pending lookup for this key, remove it from the
+ # _pending_deferred_cache, as above
entry_dict = self._pending_deferred_cache.pop(key, None)
if entry_dict is not None:
for entry in iterate_tree_cache_entry(entry_dict):
@@ -194,8 +204,10 @@ class Cache(object):
def invalidate_all(self):
self.check_thread()
- self.sequence += 1
self.cache.clear()
+ for entry in self._pending_deferred_cache.itervalues():
+ entry.invalidate()
+ self._pending_deferred_cache.clear()
class _CacheDescriptorBase(object):
diff --git a/tests/util/caches/test_descriptors.py b/tests/util/caches/test_descriptors.py
index 3f14ab503f..2516fe40f4 100644
--- a/tests/util/caches/test_descriptors.py
+++ b/tests/util/caches/test_descriptors.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -12,6 +13,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+from functools import partial
import logging
import mock
@@ -25,6 +27,50 @@ from tests import unittest
logger = logging.getLogger(__name__)
+class CacheTestCase(unittest.TestCase):
+ def test_invalidate_all(self):
+ cache = descriptors.Cache("testcache")
+
+ callback_record = [False, False]
+
+ def record_callback(idx):
+ callback_record[idx] = True
+
+ # add a couple of pending entries
+ d1 = defer.Deferred()
+ cache.set("key1", d1, partial(record_callback, 0))
+
+ d2 = defer.Deferred()
+ cache.set("key2", d2, partial(record_callback, 1))
+
+ # lookup should return the deferreds
+ self.assertIs(cache.get("key1"), d1)
+ self.assertIs(cache.get("key2"), d2)
+
+ # let one of the lookups complete
+ d2.callback("result2")
+ self.assertEqual(cache.get("key2"), "result2")
+
+ # now do the invalidation
+ cache.invalidate_all()
+
+ # lookup should return none
+ self.assertIsNone(cache.get("key1", None))
+ self.assertIsNone(cache.get("key2", None))
+
+ # both callbacks should have been callbacked
+ self.assertTrue(
+ callback_record[0], "Invalidation callback for key1 not called",
+ )
+ self.assertTrue(
+ callback_record[1], "Invalidation callback for key2 not called",
+ )
+
+ # letting the other lookup complete should do nothing
+ d1.callback("result1")
+ self.assertIsNone(cache.get("key1", None))
+
+
class DescriptorTestCase(unittest.TestCase):
@defer.inlineCallbacks
def test_cache(self):
|