diff --git a/.github/workflows/latest_deps.yml b/.github/workflows/latest_deps.yml
index 07229e56bd..8366ac9393 100644
--- a/.github/workflows/latest_deps.yml
+++ b/.github/workflows/latest_deps.yml
@@ -84,6 +84,12 @@ jobs:
if: ${{ matrix.postgres-version }}
timeout-minutes: 2
run: until pg_isready -h localhost; do sleep 1; done
+
+ # We nuke the local copy, as we've installed synapse into the virtualenv
+ # (rather than use an editable install, which we no longer support). If we
+ # don't do this then python can't find the native lib.
+ - run: rm -rf synapse/
+
- run: python -m twisted.trial --jobs=2 tests
env:
SYNAPSE_POSTGRES: ${{ matrix.database == 'postgres' || '' }}
@@ -128,6 +134,14 @@ jobs:
steps:
- uses: actions/checkout@v2
+
+ - name: Install Rust
+ uses: actions-rs/toolchain@v1
+ with:
+ toolchain: stable
+ override: true
+ - uses: Swatinem/rust-cache@v2
+
- name: Ensure sytest runs `pip install`
# Delete the lockfile so sytest will `pip install` rather than `poetry install`
run: rm /src/poetry.lock
diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml
index 50dc8e30d4..7c4ae3d7ff 100644
--- a/.github/workflows/tests.yml
+++ b/.github/workflows/tests.yml
@@ -10,6 +10,23 @@ concurrency:
cancel-in-progress: true
jobs:
+ # Job to detect what has changed so we don't run e.g. Rust checks on PRs that
+ # don't modify Rust code.
+ changes:
+ runs-on: ubuntu-latest
+ outputs:
+ rust: ${{ !startsWith(github.ref, 'refs/pull/') || steps.filter.outputs.rust }}
+ steps:
+ - uses: dorny/paths-filter@v2
+ id: filter
+ # We only check on PRs
+ if: startsWith(github.ref, 'refs/pull/')
+ with:
+ filters: |
+ rust:
+ - 'rust/**'
+ - 'Cargo.toml'
+
check-sampleconfig:
runs-on: ubuntu-latest
steps:
@@ -65,10 +82,54 @@ jobs:
extras: "all"
- run: poetry run scripts-dev/check_pydantic_models.py
+ lint-clippy:
+ runs-on: ubuntu-latest
+ needs: changes
+ if: ${{ needs.changes.outputs.rust == 'true' }}
+
+ steps:
+ - uses: actions/checkout@v2
+
+ - name: Install Rust
+ uses: actions-rs/toolchain@v1
+ with:
+ toolchain: 1.61.0
+ override: true
+ components: clippy
+ - uses: Swatinem/rust-cache@v2
+
+ - run: cargo clippy
+
+ lint-rustfmt:
+ runs-on: ubuntu-latest
+ needs: changes
+ if: ${{ needs.changes.outputs.rust == 'true' }}
+
+ steps:
+ - uses: actions/checkout@v2
+
+ - name: Install Rust
+ uses: actions-rs/toolchain@v1
+ with:
+ toolchain: 1.61.0
+ override: true
+ components: rustfmt
+ - uses: Swatinem/rust-cache@v2
+
+ - run: cargo fmt --check
+
# Dummy step to gate other tests on without repeating the whole list
linting-done:
if: ${{ !cancelled() }} # Run this even if prior jobs were skipped
- needs: [lint, lint-crlf, lint-newsfile, lint-pydantic, check-sampleconfig, check-schema-delta]
+ needs:
+ - lint
+ - lint-crlf
+ - lint-newsfile
+ - lint-pydantic
+ - check-sampleconfig
+ - check-schema-delta
+ - lint-clippy
+ - lint-rustfmt
runs-on: ubuntu-latest
steps:
- run: "true"
@@ -384,6 +445,25 @@ jobs:
shell: bash
name: Run Complement Tests
+ cargo-test:
+ if: ${{ needs.changes.outputs.rust == 'true' }}
+ runs-on: ubuntu-latest
+ needs:
+ - linting-done
+ - changes
+
+ steps:
+ - uses: actions/checkout@v2
+
+ - name: Install Rust
+ uses: actions-rs/toolchain@v1
+ with:
+ toolchain: 1.61.0
+ override: true
+ - uses: Swatinem/rust-cache@v2
+
+ - run: cargo test
+
# a job which marks all the other jobs as complete, thus allowing PRs to be merged.
tests-done:
if: ${{ always() }}
@@ -398,6 +478,7 @@ jobs:
- export-data
- portdb
- complement
+ - cargo-test
runs-on: ubuntu-latest
steps:
- uses: matrix-org/done-action@v2
diff --git a/changelog.d/13717.misc b/changelog.d/13717.misc
new file mode 100644
index 0000000000..07ace50b12
--- /dev/null
+++ b/changelog.d/13717.misc
@@ -0,0 +1 @@
+Add experimental configuration option to allow disabling legacy Prometheus metric names.
\ No newline at end of file
diff --git a/changelog.d/13718.misc b/changelog.d/13718.misc
new file mode 100644
index 0000000000..07ace50b12
--- /dev/null
+++ b/changelog.d/13718.misc
@@ -0,0 +1 @@
+Add experimental configuration option to allow disabling legacy Prometheus metric names.
\ No newline at end of file
diff --git a/changelog.d/13741.feature b/changelog.d/13741.feature
new file mode 100644
index 0000000000..dff46f373f
--- /dev/null
+++ b/changelog.d/13741.feature
@@ -0,0 +1 @@
+Document the timestamp when a user accepts the consent, if [consent tracking](https://matrix-org.github.io/synapse/latest/consent_tracking.html) is used.
\ No newline at end of file
diff --git a/changelog.d/13743.misc b/changelog.d/13743.misc
new file mode 100644
index 0000000000..2e0dd68a0f
--- /dev/null
+++ b/changelog.d/13743.misc
@@ -0,0 +1 @@
+Add a stub Rust crate.
diff --git a/changelog.d/13746.bugfix b/changelog.d/13746.bugfix
new file mode 100644
index 0000000000..b692af8fd5
--- /dev/null
+++ b/changelog.d/13746.bugfix
@@ -0,0 +1 @@
+Fix a long standing bug where Synapse would fail to handle malformed user IDs or room aliases gracefully in certain cases.
diff --git a/changelog.d/13748.misc b/changelog.d/13748.misc
new file mode 100644
index 0000000000..2f419bb659
--- /dev/null
+++ b/changelog.d/13748.misc
@@ -0,0 +1 @@
+Avoid raising an error due to malformed user IDs in `get_current_hosts_in_room`. Malformed user IDs cannot currently join a room, so this error would not be hit.
diff --git a/changelog.d/13750.misc b/changelog.d/13750.misc
new file mode 100644
index 0000000000..3bccc21fc5
--- /dev/null
+++ b/changelog.d/13750.misc
@@ -0,0 +1 @@
+Update the docstrings for `get_users_in_room` and `get_current_hosts_in_room` to explain the impact of partial state.
diff --git a/changelog.d/13752.misc b/changelog.d/13752.misc
new file mode 100644
index 0000000000..7624861b9f
--- /dev/null
+++ b/changelog.d/13752.misc
@@ -0,0 +1 @@
+User an additional database query when persisting receipts.
diff --git a/changelog.d/13754.misc b/changelog.d/13754.misc
new file mode 100644
index 0000000000..662ee00e99
--- /dev/null
+++ b/changelog.d/13754.misc
@@ -0,0 +1 @@
+Re-type hint some collections as read-only.
diff --git a/changelog.d/13756.misc b/changelog.d/13756.misc
new file mode 100644
index 0000000000..06e9cd09bf
--- /dev/null
+++ b/changelog.d/13756.misc
@@ -0,0 +1 @@
+Remove unused Prometheus recording rules from `synapse-v2.rules` and add comments describing where the rest are used.
\ No newline at end of file
diff --git a/changelog.d/13760.removal b/changelog.d/13760.removal
new file mode 100644
index 0000000000..624e7c3678
--- /dev/null
+++ b/changelog.d/13760.removal
@@ -0,0 +1 @@
+Synapse will now refuse to start if configured to use SQLite < 3.27.
diff --git a/changelog.d/13763.misc b/changelog.d/13763.misc
new file mode 100644
index 0000000000..2e0dd68a0f
--- /dev/null
+++ b/changelog.d/13763.misc
@@ -0,0 +1 @@
+Add a stub Rust crate.
diff --git a/contrib/prometheus/synapse-v1.rules b/contrib/prometheus/synapse-v1.rules
deleted file mode 100644
index 4c900ba537..0000000000
--- a/contrib/prometheus/synapse-v1.rules
+++ /dev/null
@@ -1,21 +0,0 @@
-synapse_federation_transaction_queue_pendingEdus:total = sum(synapse_federation_transaction_queue_pendingEdus or absent(synapse_federation_transaction_queue_pendingEdus)*0)
-synapse_federation_transaction_queue_pendingPdus:total = sum(synapse_federation_transaction_queue_pendingPdus or absent(synapse_federation_transaction_queue_pendingPdus)*0)
-
-synapse_http_server_request_count:method{servlet=""} = sum(synapse_http_server_request_count) by (method)
-synapse_http_server_request_count:servlet{method=""} = sum(synapse_http_server_request_count) by (servlet)
-
-synapse_http_server_request_count:total{servlet=""} = sum(synapse_http_server_request_count:by_method) by (servlet)
-
-synapse_cache:hit_ratio_5m = rate(synapse_util_caches_cache:hits[5m]) / rate(synapse_util_caches_cache:total[5m])
-synapse_cache:hit_ratio_30s = rate(synapse_util_caches_cache:hits[30s]) / rate(synapse_util_caches_cache:total[30s])
-
-synapse_federation_client_sent{type="EDU"} = synapse_federation_client_sent_edus + 0
-synapse_federation_client_sent{type="PDU"} = synapse_federation_client_sent_pdu_destinations:count + 0
-synapse_federation_client_sent{type="Query"} = sum(synapse_federation_client_sent_queries) by (job)
-
-synapse_federation_server_received{type="EDU"} = synapse_federation_server_received_edus + 0
-synapse_federation_server_received{type="PDU"} = synapse_federation_server_received_pdus + 0
-synapse_federation_server_received{type="Query"} = sum(synapse_federation_server_received_queries) by (job)
-
-synapse_federation_transaction_queue_pending{type="EDU"} = synapse_federation_transaction_queue_pending_edus + 0
-synapse_federation_transaction_queue_pending{type="PDU"} = synapse_federation_transaction_queue_pending_pdus + 0
diff --git a/contrib/prometheus/synapse-v2.rules b/contrib/prometheus/synapse-v2.rules
index 7e405bf7f0..cbe6f7beba 100644
--- a/contrib/prometheus/synapse-v2.rules
+++ b/contrib/prometheus/synapse-v2.rules
@@ -1,55 +1,35 @@
groups:
- name: synapse
rules:
- - record: "synapse_federation_transaction_queue_pendingEdus:total"
- expr: "sum(synapse_federation_transaction_queue_pendingEdus or absent(synapse_federation_transaction_queue_pendingEdus)*0)"
- - record: "synapse_federation_transaction_queue_pendingPdus:total"
- expr: "sum(synapse_federation_transaction_queue_pendingPdus or absent(synapse_federation_transaction_queue_pendingPdus)*0)"
- - record: 'synapse_http_server_request_count:method'
- labels:
- servlet: ""
- expr: "sum(synapse_http_server_request_count) by (method)"
- - record: 'synapse_http_server_request_count:servlet'
- labels:
- method: ""
- expr: 'sum(synapse_http_server_request_count) by (servlet)'
-
- - record: 'synapse_http_server_request_count:total'
- labels:
- servlet: ""
- expr: 'sum(synapse_http_server_request_count:by_method) by (servlet)'
-
- - record: 'synapse_cache:hit_ratio_5m'
- expr: 'rate(synapse_util_caches_cache:hits[5m]) / rate(synapse_util_caches_cache:total[5m])'
- - record: 'synapse_cache:hit_ratio_30s'
- expr: 'rate(synapse_util_caches_cache:hits[30s]) / rate(synapse_util_caches_cache:total[30s])'
-
+ # These 3 rules are used in the included Prometheus console
- record: 'synapse_federation_client_sent'
labels:
type: "EDU"
- expr: 'synapse_federation_client_sent_edus + 0'
+ expr: 'synapse_federation_client_sent_edus_total + 0'
- record: 'synapse_federation_client_sent'
labels:
type: "PDU"
- expr: 'synapse_federation_client_sent_pdu_destinations:count + 0'
+ expr: 'synapse_federation_client_sent_pdu_destinations_count_total + 0'
- record: 'synapse_federation_client_sent'
labels:
type: "Query"
expr: 'sum(synapse_federation_client_sent_queries) by (job)'
+ # These 3 rules are used in the included Prometheus console
- record: 'synapse_federation_server_received'
labels:
type: "EDU"
- expr: 'synapse_federation_server_received_edus + 0'
+ expr: 'synapse_federation_server_received_edus_total + 0'
- record: 'synapse_federation_server_received'
labels:
type: "PDU"
- expr: 'synapse_federation_server_received_pdus + 0'
+ expr: 'synapse_federation_server_received_pdus_total + 0'
- record: 'synapse_federation_server_received'
labels:
type: "Query"
expr: 'sum(synapse_federation_server_received_queries) by (job)'
+ # These 2 rules are used in the included Prometheus console
- record: 'synapse_federation_transaction_queue_pending'
labels:
type: "EDU"
@@ -59,20 +39,25 @@ groups:
type: "PDU"
expr: 'synapse_federation_transaction_queue_pending_pdus + 0'
+ # These 3 rules are used in the included Grafana dashboard
- record: synapse_storage_events_persisted_by_source_type
- expr: sum without(type, origin_type, origin_entity) (synapse_storage_events_persisted_events_sep{origin_type="remote"})
+ expr: sum without(type, origin_type, origin_entity) (synapse_storage_events_persisted_events_sep_total{origin_type="remote"})
labels:
type: remote
- record: synapse_storage_events_persisted_by_source_type
- expr: sum without(type, origin_type, origin_entity) (synapse_storage_events_persisted_events_sep{origin_entity="*client*",origin_type="local"})
+ expr: sum without(type, origin_type, origin_entity) (synapse_storage_events_persisted_events_sep_total{origin_entity="*client*",origin_type="local"})
labels:
type: local
- record: synapse_storage_events_persisted_by_source_type
- expr: sum without(type, origin_type, origin_entity) (synapse_storage_events_persisted_events_sep{origin_entity!="*client*",origin_type="local"})
+ expr: sum without(type, origin_type, origin_entity) (synapse_storage_events_persisted_events_sep_total{origin_entity!="*client*",origin_type="local"})
labels:
type: bridges
+
+ # This rule is used in the included Grafana dashboard
- record: synapse_storage_events_persisted_by_event_type
- expr: sum without(origin_entity, origin_type) (synapse_storage_events_persisted_events_sep)
+ expr: sum without(origin_entity, origin_type) (synapse_storage_events_persisted_events_sep_total)
+
+ # This rule is used in the included Grafana dashboard
- record: synapse_storage_events_persisted_by_origin
- expr: sum without(type) (synapse_storage_events_persisted_events_sep)
+ expr: sum without(type) (synapse_storage_events_persisted_events_sep_total)
diff --git a/docs/admin_api/user_admin_api.md b/docs/admin_api/user_admin_api.md
index c1ca0c8a64..975f05c929 100644
--- a/docs/admin_api/user_admin_api.md
+++ b/docs/admin_api/user_admin_api.md
@@ -42,6 +42,7 @@ It returns a JSON body like the following:
"appservice_id": null,
"consent_server_notice_sent": null,
"consent_version": null,
+ "consent_ts": null,
"external_ids": [
{
"auth_provider": "<provider1>",
@@ -364,6 +365,7 @@ The following actions are **NOT** performed. The list may be incomplete.
- Remove the user's creation (registration) timestamp
- [Remove rate limit overrides](#override-ratelimiting-for-users)
- Remove from monthly active users
+- Remove user's consent information (consent version and timestamp)
## Reset password
diff --git a/rust/src/lib.rs b/rust/src/lib.rs
index fc4eb39154..142fc2ed93 100644
--- a/rust/src/lib.rs
+++ b/rust/src/lib.rs
@@ -11,5 +11,6 @@ fn sum_as_string(a: usize, b: usize) -> PyResult<String> {
#[pymodule]
fn synapse_rust(_py: Python<'_>, m: &PyModule) -> PyResult<()> {
m.add_function(wrap_pyfunction!(sum_as_string, m)?)?;
+
Ok(())
}
diff --git a/synapse/app/phone_stats_home.py b/synapse/app/phone_stats_home.py
index 51c8d15711..53db1e85b3 100644
--- a/synapse/app/phone_stats_home.py
+++ b/synapse/app/phone_stats_home.py
@@ -32,15 +32,15 @@ logger = logging.getLogger("synapse.app.homeserver")
_stats_process: List[Tuple[int, "resource.struct_rusage"]] = []
# Gauges to expose monthly active user control metrics
-current_mau_gauge = Gauge("synapse_admin_mau:current", "Current MAU")
+current_mau_gauge = Gauge("synapse_admin_mau_current", "Current MAU")
current_mau_by_service_gauge = Gauge(
"synapse_admin_mau_current_mau_by_service",
"Current MAU by service",
["app_service"],
)
-max_mau_gauge = Gauge("synapse_admin_mau:max", "MAU Limit")
+max_mau_gauge = Gauge("synapse_admin_mau_max", "MAU Limit")
registered_reserved_users_mau_gauge = Gauge(
- "synapse_admin_mau:registered_reserved_users",
+ "synapse_admin_mau_registered_reserved_users",
"Registered users with reserved threepids",
)
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 8bc60e3e3e..a6cb3ba58f 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -62,12 +62,12 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
sent_pdus_destination_dist_count = Counter(
- "synapse_federation_client_sent_pdu_destinations:count",
+ "synapse_federation_client_sent_pdu_destinations_count",
"Number of PDUs queued for sending to one or more destinations",
)
sent_pdus_destination_dist_total = Counter(
- "synapse_federation_client_sent_pdu_destinations:total",
+ "synapse_federation_client_sent_pdu_destinations",
"Total number of PDUs queued for sending across all destinations",
)
diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py
index d4fe7df533..cf9f19608a 100644
--- a/synapse/handlers/admin.py
+++ b/synapse/handlers/admin.py
@@ -70,6 +70,7 @@ class AdminHandler:
"appservice_id",
"consent_server_notice_sent",
"consent_version",
+ "consent_ts",
"user_type",
"is_guest",
}
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 2d95b1fa24..5293fa4d0e 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -15,6 +15,7 @@ import itertools
import logging
from typing import (
TYPE_CHECKING,
+ AbstractSet,
Any,
Collection,
Dict,
@@ -1413,10 +1414,10 @@ class SyncHandler:
async def _generate_sync_entry_for_device_list(
self,
sync_result_builder: "SyncResultBuilder",
- newly_joined_rooms: Set[str],
- newly_joined_or_invited_or_knocked_users: Set[str],
- newly_left_rooms: Set[str],
- newly_left_users: Set[str],
+ newly_joined_rooms: AbstractSet[str],
+ newly_joined_or_invited_or_knocked_users: AbstractSet[str],
+ newly_left_rooms: AbstractSet[str],
+ newly_left_users: AbstractSet[str],
) -> DeviceListUpdates:
"""Generate the DeviceListUpdates section of sync
@@ -1434,8 +1435,7 @@ class SyncHandler:
user_id = sync_result_builder.sync_config.user.to_string()
since_token = sync_result_builder.since_token
- # We're going to mutate these fields, so lets copy them rather than
- # assume they won't get used later.
+ # Take a copy since these fields will be mutated later.
newly_joined_or_invited_or_knocked_users = set(
newly_joined_or_invited_or_knocked_users
)
@@ -1635,8 +1635,8 @@ class SyncHandler:
async def _generate_sync_entry_for_presence(
self,
sync_result_builder: "SyncResultBuilder",
- newly_joined_rooms: Set[str],
- newly_joined_or_invited_users: Set[str],
+ newly_joined_rooms: AbstractSet[str],
+ newly_joined_or_invited_users: AbstractSet[str],
) -> None:
"""Generates the presence portion of the sync response. Populates the
`sync_result_builder` with the result.
@@ -1694,7 +1694,7 @@ class SyncHandler:
self,
sync_result_builder: "SyncResultBuilder",
account_data_by_room: Dict[str, Dict[str, JsonDict]],
- ) -> Tuple[Set[str], Set[str], Set[str], Set[str]]:
+ ) -> Tuple[AbstractSet[str], AbstractSet[str], AbstractSet[str], AbstractSet[str]]:
"""Generates the rooms portion of the sync response. Populates the
`sync_result_builder` with the result.
@@ -2534,7 +2534,7 @@ class SyncResultBuilder:
archived: List[ArchivedSyncResult] = attr.Factory(list)
to_device: List[JsonDict] = attr.Factory(list)
- def calculate_user_changes(self) -> Tuple[Set[str], Set[str]]:
+ def calculate_user_changes(self) -> Tuple[AbstractSet[str], AbstractSet[str]]:
"""Work out which other users have joined or left rooms we are joined to.
This data only is only useful for an incremental sync.
diff --git a/synapse/metrics/_legacy_exposition.py b/synapse/metrics/_legacy_exposition.py
index 6f00ff2a47..563d8cc2c6 100644
--- a/synapse/metrics/_legacy_exposition.py
+++ b/synapse/metrics/_legacy_exposition.py
@@ -34,8 +34,6 @@ from prometheus_client.core import Sample
from twisted.web.resource import Resource
from twisted.web.server import Request
-from synapse.util import caches
-
CONTENT_TYPE_LATEST = "text/plain; version=0.0.4; charset=utf-8"
@@ -93,6 +91,11 @@ LEGACY_METRIC_NAMES = {
"synapse_util_caches_response_cache_hits": "synapse_util_caches_response_cache:hits",
"synapse_util_caches_response_cache_evicted_size": "synapse_util_caches_response_cache:evicted_size",
"synapse_util_caches_response_cache": "synapse_util_caches_response_cache:total",
+ "synapse_federation_client_sent_pdu_destinations": "synapse_federation_client_sent_pdu_destinations:total",
+ "synapse_federation_client_sent_pdu_destinations_count": "synapse_federation_client_sent_pdu_destinations:count",
+ "synapse_admin_mau_current": "synapse_admin_mau:current",
+ "synapse_admin_mau_max": "synapse_admin_mau:max",
+ "synapse_admin_mau_registered_reserved_users": "synapse_admin_mau:registered_reserved_users",
}
@@ -102,11 +105,6 @@ def generate_latest(registry: CollectorRegistry, emit_help: bool = False) -> byt
by prometheus-client.
"""
- # Trigger the cache metrics to be rescraped, which updates the common
- # metrics but do not produce metrics themselves
- for collector in caches.collectors_by_name.values():
- collector.collect()
-
output = []
for metric in registry.collect():
diff --git a/synapse/server.py b/synapse/server.py
index 5a99c0b344..df3a1cb405 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -341,7 +341,17 @@ class HomeServer(metaclass=abc.ABCMeta):
return domain_specific_string.domain == self.hostname
def is_mine_id(self, string: str) -> bool:
- return string.split(":", 1)[1] == self.hostname
+ """Determines whether a user ID or room alias originates from this homeserver.
+
+ Returns:
+ `True` if the hostname part of the user ID or room alias matches this
+ homeserver.
+ `False` otherwise, or if the user ID or room alias is malformed.
+ """
+ localpart_hostname = string.split(":", 1)
+ if len(localpart_hostname) < 2:
+ return False
+ return localpart_hostname[1] == self.hostname
@cache_in_self
def get_clock(self) -> Clock:
diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index b394a6658b..e881bff7fb 100644
--- a/synapse/storage/database.py
+++ b/synapse/storage/database.py
@@ -533,15 +533,14 @@ class DatabasePool:
if isinstance(self.engine, Sqlite3Engine):
self._unsafe_to_upsert_tables.add("user_directory_search")
- if self.engine.can_native_upsert:
- # Check ASAP (and then later, every 1s) to see if we have finished
- # background updates of tables that aren't safe to update.
- self._clock.call_later(
- 0.0,
- run_as_background_process,
- "upsert_safety_check",
- self._check_safe_to_upsert,
- )
+ # Check ASAP (and then later, every 1s) to see if we have finished
+ # background updates of tables that aren't safe to update.
+ self._clock.call_later(
+ 0.0,
+ run_as_background_process,
+ "upsert_safety_check",
+ self._check_safe_to_upsert,
+ )
def name(self) -> str:
"Return the name of this database"
@@ -1160,11 +1159,8 @@ class DatabasePool:
attempts = 0
while True:
try:
- # We can autocommit if we are going to use native upserts
- autocommit = (
- self.engine.can_native_upsert
- and table not in self._unsafe_to_upsert_tables
- )
+ # We can autocommit if it is safe to upsert
+ autocommit = table not in self._unsafe_to_upsert_tables
return await self.runInteraction(
desc,
@@ -1199,7 +1195,7 @@ class DatabasePool:
) -> bool:
"""
Pick the UPSERT method which works best on the platform. Either the
- native one (Pg9.5+, recent SQLites), or fall back to an emulated method.
+ native one (Pg9.5+, SQLite >= 3.24), or fall back to an emulated method.
Args:
txn: The transaction to use.
@@ -1207,14 +1203,15 @@ class DatabasePool:
keyvalues: The unique key tables and their new values
values: The nonunique columns and their new values
insertion_values: additional key/values to use only when inserting
- lock: True to lock the table when doing the upsert.
+ lock: True to lock the table when doing the upsert. Unused when performing
+ a native upsert.
Returns:
Returns True if a row was inserted or updated (i.e. if `values` is
not empty then this always returns True)
"""
insertion_values = insertion_values or {}
- if self.engine.can_native_upsert and table not in self._unsafe_to_upsert_tables:
+ if table not in self._unsafe_to_upsert_tables:
return self.simple_upsert_txn_native_upsert(
txn, table, keyvalues, values, insertion_values=insertion_values
)
@@ -1365,14 +1362,12 @@ class DatabasePool:
value_names: The value column names
value_values: A list of each row's value column values.
Ignored if value_names is empty.
- lock: True to lock the table when doing the upsert. Unused if the database engine
- supports native upserts.
+ lock: True to lock the table when doing the upsert. Unused when performing
+ a native upsert.
"""
- # We can autocommit if we are going to use native upserts
- autocommit = (
- self.engine.can_native_upsert and table not in self._unsafe_to_upsert_tables
- )
+ # We can autocommit if it safe to upsert
+ autocommit = table not in self._unsafe_to_upsert_tables
await self.runInteraction(
desc,
@@ -1406,10 +1401,10 @@ class DatabasePool:
value_names: The value column names
value_values: A list of each row's value column values.
Ignored if value_names is empty.
- lock: True to lock the table when doing the upsert. Unused if the database engine
- supports native upserts.
+ lock: True to lock the table when doing the upsert. Unused when performing
+ a native upsert.
"""
- if self.engine.can_native_upsert and table not in self._unsafe_to_upsert_tables:
+ if table not in self._unsafe_to_upsert_tables:
return self.simple_upsert_many_txn_native_upsert(
txn, table, key_names, key_values, value_names, value_values
)
diff --git a/synapse/storage/databases/main/lock.py b/synapse/storage/databases/main/lock.py
index 2d7633fbd5..7270ef09da 100644
--- a/synapse/storage/databases/main/lock.py
+++ b/synapse/storage/databases/main/lock.py
@@ -129,91 +129,48 @@ class LockStore(SQLBaseStore):
now = self._clock.time_msec()
token = random_string(6)
- if self.db_pool.engine.can_native_upsert:
-
- def _try_acquire_lock_txn(txn: LoggingTransaction) -> bool:
- # We take out the lock if either a) there is no row for the lock
- # already, b) the existing row has timed out, or c) the row is
- # for this instance (which means the process got killed and
- # restarted)
- sql = """
- INSERT INTO worker_locks (lock_name, lock_key, instance_name, token, last_renewed_ts)
- VALUES (?, ?, ?, ?, ?)
- ON CONFLICT (lock_name, lock_key)
- DO UPDATE
- SET
- token = EXCLUDED.token,
- instance_name = EXCLUDED.instance_name,
- last_renewed_ts = EXCLUDED.last_renewed_ts
- WHERE
- worker_locks.last_renewed_ts < ?
- OR worker_locks.instance_name = EXCLUDED.instance_name
- """
- txn.execute(
- sql,
- (
- lock_name,
- lock_key,
- self._instance_name,
- token,
- now,
- now - _LOCK_TIMEOUT_MS,
- ),
- )
-
- # We only acquired the lock if we inserted or updated the table.
- return bool(txn.rowcount)
-
- did_lock = await self.db_pool.runInteraction(
- "try_acquire_lock",
- _try_acquire_lock_txn,
- # We can autocommit here as we're executing a single query, this
- # will avoid serialization errors.
- db_autocommit=True,
+ def _try_acquire_lock_txn(txn: LoggingTransaction) -> bool:
+ # We take out the lock if either a) there is no row for the lock
+ # already, b) the existing row has timed out, or c) the row is
+ # for this instance (which means the process got killed and
+ # restarted)
+ sql = """
+ INSERT INTO worker_locks (lock_name, lock_key, instance_name, token, last_renewed_ts)
+ VALUES (?, ?, ?, ?, ?)
+ ON CONFLICT (lock_name, lock_key)
+ DO UPDATE
+ SET
+ token = EXCLUDED.token,
+ instance_name = EXCLUDED.instance_name,
+ last_renewed_ts = EXCLUDED.last_renewed_ts
+ WHERE
+ worker_locks.last_renewed_ts < ?
+ OR worker_locks.instance_name = EXCLUDED.instance_name
+ """
+ txn.execute(
+ sql,
+ (
+ lock_name,
+ lock_key,
+ self._instance_name,
+ token,
+ now,
+ now - _LOCK_TIMEOUT_MS,
+ ),
)
- if not did_lock:
- return None
-
- else:
- # If we're on an old SQLite we emulate the above logic by first
- # clearing out any existing stale locks and then upserting.
-
- def _try_acquire_lock_emulated_txn(txn: LoggingTransaction) -> bool:
- sql = """
- DELETE FROM worker_locks
- WHERE
- lock_name = ?
- AND lock_key = ?
- AND (last_renewed_ts < ? OR instance_name = ?)
- """
- txn.execute(
- sql,
- (lock_name, lock_key, now - _LOCK_TIMEOUT_MS, self._instance_name),
- )
-
- inserted = self.db_pool.simple_upsert_txn_emulated(
- txn,
- table="worker_locks",
- keyvalues={
- "lock_name": lock_name,
- "lock_key": lock_key,
- },
- values={},
- insertion_values={
- "token": token,
- "last_renewed_ts": self._clock.time_msec(),
- "instance_name": self._instance_name,
- },
- )
-
- return inserted
- did_lock = await self.db_pool.runInteraction(
- "try_acquire_lock_emulated", _try_acquire_lock_emulated_txn
- )
+ # We only acquired the lock if we inserted or updated the table.
+ return bool(txn.rowcount)
- if not did_lock:
- return None
+ did_lock = await self.db_pool.runInteraction(
+ "try_acquire_lock",
+ _try_acquire_lock_txn,
+ # We can autocommit here as we're executing a single query, this
+ # will avoid serialization errors.
+ db_autocommit=True,
+ )
+ if not did_lock:
+ return None
lock = Lock(
self._reactor,
diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py
index 124c70ad37..3838409519 100644
--- a/synapse/storage/databases/main/receipts.py
+++ b/synapse/storage/databases/main/receipts.py
@@ -812,7 +812,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
# FIXME: This shouldn't invalidate the whole cache
txn.call_after(self._get_linearized_receipts_for_room.invalidate, (room_id,))
- self.db_pool.simple_delete_txn(
+ self.db_pool.simple_upsert_txn(
txn,
table="receipts_graph",
keyvalues={
@@ -820,17 +820,13 @@ class ReceiptsWorkerStore(SQLBaseStore):
"receipt_type": receipt_type,
"user_id": user_id,
},
- )
- self.db_pool.simple_insert_txn(
- txn,
- table="receipts_graph",
values={
- "room_id": room_id,
- "receipt_type": receipt_type,
- "user_id": user_id,
"event_ids": json_encoder.encode(event_ids),
"data": json_encoder.encode(data),
},
+ # receipts_graph has a unique constraint on
+ # (user_id, room_id, receipt_type), so no need to lock
+ lock=False,
)
diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py
index 7fb9c801da..ac821878b0 100644
--- a/synapse/storage/databases/main/registration.py
+++ b/synapse/storage/databases/main/registration.py
@@ -175,6 +175,7 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
"is_guest",
"admin",
"consent_version",
+ "consent_ts",
"consent_server_notice_sent",
"appservice_id",
"creation_ts",
@@ -2227,7 +2228,10 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore):
txn,
table="users",
keyvalues={"name": user_id},
- updatevalues={"consent_version": consent_version},
+ updatevalues={
+ "consent_version": consent_version,
+ "consent_ts": self._clock.time_msec(),
+ },
)
self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))
diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py
index a77e49dc66..6e1ff5626b 100644
--- a/synapse/storage/databases/main/roommember.py
+++ b/synapse/storage/databases/main/roommember.py
@@ -192,8 +192,15 @@ class RoomMemberWorkerStore(EventsWorkerStore):
(aka. with the lowest depth). This is done to match the sort in
`get_current_hosts_in_room()` and so we can re-use the cache but it's
not horrible to have here either.
- """
+ Uses `m.room.member`s in the room state at the current forward extremities to
+ determine which users are in the room.
+
+ Will return inaccurate results for rooms with partial state, since the state for
+ the forward extremities of those rooms will exclude most members. We may also
+ calculate room state incorrectly for such rooms and believe that a member is or
+ is not in the room when the opposite is true.
+ """
return await self.db_pool.runInteraction(
"get_users_in_room", self.get_users_in_room_txn, room_id
)
@@ -1022,6 +1029,14 @@ class RoomMemberWorkerStore(EventsWorkerStore):
longest is good because they're most likely to have anything we ask
about.
+ Uses `m.room.member`s in the room state at the current forward extremities to
+ determine which hosts are in the room.
+
+ Will return inaccurate results for rooms with partial state, since the state for
+ the forward extremities of those rooms will exclude most members. We may also
+ calculate room state incorrectly for such rooms and believe that a host is or
+ is not in the room when the opposite is true.
+
Returns:
Returns a list of servers sorted by longest in the room first. (aka.
sorted by join with the lowest depth first).
@@ -1044,6 +1059,8 @@ class RoomMemberWorkerStore(EventsWorkerStore):
# We use a `Set` just for fast lookups
domain_set: Set[str] = set()
for u in users:
+ if ":" not in u:
+ continue
domain = get_domain_from_id(u)
if domain not in domain_set:
domain_set.add(domain)
@@ -1077,7 +1094,8 @@ class RoomMemberWorkerStore(EventsWorkerStore):
ORDER BY min(e.depth) ASC;
"""
txn.execute(sql, (room_id,))
- return [d for d, in txn]
+ # `server_domain` will be `NULL` for malformed MXIDs with no colons.
+ return [d for d, in txn if d is not None]
return await self.db_pool.runInteraction(
"get_current_hosts_in_room", get_current_hosts_in_room_txn
diff --git a/synapse/storage/databases/main/stats.py b/synapse/storage/databases/main/stats.py
index b4c652acf3..356d4ca788 100644
--- a/synapse/storage/databases/main/stats.py
+++ b/synapse/storage/databases/main/stats.py
@@ -446,59 +446,41 @@ class StatsStore(StateDeltasStore):
absolutes: Absolute (set) fields
additive_relatives: Fields that will be added onto if existing row present.
"""
- if self.database_engine.can_native_upsert:
- absolute_updates = [
- "%(field)s = EXCLUDED.%(field)s" % {"field": field}
- for field in absolutes.keys()
- ]
-
- relative_updates = [
- "%(field)s = EXCLUDED.%(field)s + COALESCE(%(table)s.%(field)s, 0)"
- % {"table": table, "field": field}
- for field in additive_relatives.keys()
- ]
-
- insert_cols = []
- qargs = []
-
- for (key, val) in chain(
- keyvalues.items(), absolutes.items(), additive_relatives.items()
- ):
- insert_cols.append(key)
- qargs.append(val)
+ absolute_updates = [
+ "%(field)s = EXCLUDED.%(field)s" % {"field": field}
+ for field in absolutes.keys()
+ ]
+
+ relative_updates = [
+ "%(field)s = EXCLUDED.%(field)s + COALESCE(%(table)s.%(field)s, 0)"
+ % {"table": table, "field": field}
+ for field in additive_relatives.keys()
+ ]
+
+ insert_cols = []
+ qargs = []
+
+ for (key, val) in chain(
+ keyvalues.items(), absolutes.items(), additive_relatives.items()
+ ):
+ insert_cols.append(key)
+ qargs.append(val)
+
+ sql = """
+ INSERT INTO %(table)s (%(insert_cols_cs)s)
+ VALUES (%(insert_vals_qs)s)
+ ON CONFLICT (%(key_columns)s) DO UPDATE SET %(updates)s
+ """ % {
+ "table": table,
+ "insert_cols_cs": ", ".join(insert_cols),
+ "insert_vals_qs": ", ".join(
+ ["?"] * (len(keyvalues) + len(absolutes) + len(additive_relatives))
+ ),
+ "key_columns": ", ".join(keyvalues),
+ "updates": ", ".join(chain(absolute_updates, relative_updates)),
+ }
- sql = """
- INSERT INTO %(table)s (%(insert_cols_cs)s)
- VALUES (%(insert_vals_qs)s)
- ON CONFLICT (%(key_columns)s) DO UPDATE SET %(updates)s
- """ % {
- "table": table,
- "insert_cols_cs": ", ".join(insert_cols),
- "insert_vals_qs": ", ".join(
- ["?"] * (len(keyvalues) + len(absolutes) + len(additive_relatives))
- ),
- "key_columns": ", ".join(keyvalues),
- "updates": ", ".join(chain(absolute_updates, relative_updates)),
- }
-
- txn.execute(sql, qargs)
- else:
- self.database_engine.lock_table(txn, table)
- retcols = list(chain(absolutes.keys(), additive_relatives.keys()))
- current_row = self.db_pool.simple_select_one_txn(
- txn, table, keyvalues, retcols, allow_none=True
- )
- if current_row is None:
- merged_dict = {**keyvalues, **absolutes, **additive_relatives}
- self.db_pool.simple_insert_txn(txn, table, merged_dict)
- else:
- for (key, val) in additive_relatives.items():
- if current_row[key] is None:
- current_row[key] = val
- else:
- current_row[key] += val
- current_row.update(absolutes)
- self.db_pool.simple_update_one_txn(txn, table, keyvalues, current_row)
+ txn.execute(sql, qargs)
async def _calculate_and_set_initial_state_for_room(self, room_id: str) -> None:
"""Calculate and insert an entry into room_stats_current.
diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py
index ba79e19f7f..f8c6877ee8 100644
--- a/synapse/storage/databases/main/transactions.py
+++ b/synapse/storage/databases/main/transactions.py
@@ -221,25 +221,15 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore):
retry_interval: how long until next retry in ms
"""
- if self.database_engine.can_native_upsert:
- await self.db_pool.runInteraction(
- "set_destination_retry_timings",
- self._set_destination_retry_timings_native,
- destination,
- failure_ts,
- retry_last_ts,
- retry_interval,
- db_autocommit=True, # Safe as its a single upsert
- )
- else:
- await self.db_pool.runInteraction(
- "set_destination_retry_timings",
- self._set_destination_retry_timings_emulated,
- destination,
- failure_ts,
- retry_last_ts,
- retry_interval,
- )
+ await self.db_pool.runInteraction(
+ "set_destination_retry_timings",
+ self._set_destination_retry_timings_native,
+ destination,
+ failure_ts,
+ retry_last_ts,
+ retry_interval,
+ db_autocommit=True, # Safe as it's a single upsert
+ )
def _set_destination_retry_timings_native(
self,
@@ -249,8 +239,6 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore):
retry_last_ts: int,
retry_interval: int,
) -> None:
- assert self.database_engine.can_native_upsert
-
# Upsert retry time interval if retry_interval is zero (i.e. we're
# resetting it) or greater than the existing retry interval.
#
diff --git a/synapse/storage/engines/_base.py b/synapse/storage/engines/_base.py
index 971ff82693..0d16a419a4 100644
--- a/synapse/storage/engines/_base.py
+++ b/synapse/storage/engines/_base.py
@@ -45,14 +45,6 @@ class BaseDatabaseEngine(Generic[ConnectionType], metaclass=abc.ABCMeta):
@property
@abc.abstractmethod
- def can_native_upsert(self) -> bool:
- """
- Do we support native UPSERTs?
- """
- ...
-
- @property
- @abc.abstractmethod
def supports_using_any_list(self) -> bool:
"""
Do we support using `a = ANY(?)` and passing a list
diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py
index 517f9d5f98..7f7d006ac2 100644
--- a/synapse/storage/engines/postgres.py
+++ b/synapse/storage/engines/postgres.py
@@ -159,13 +159,6 @@ class PostgresEngine(BaseDatabaseEngine[psycopg2.extensions.connection]):
db_conn.commit()
@property
- def can_native_upsert(self) -> bool:
- """
- Can we use native UPSERTs?
- """
- return True
-
- @property
def supports_using_any_list(self) -> bool:
"""Do we support using `a = ANY(?)` and passing a list"""
return True
diff --git a/synapse/storage/engines/sqlite.py b/synapse/storage/engines/sqlite.py
index 621f2c5efe..095ae0a096 100644
--- a/synapse/storage/engines/sqlite.py
+++ b/synapse/storage/engines/sqlite.py
@@ -49,14 +49,6 @@ class Sqlite3Engine(BaseDatabaseEngine[sqlite3.Connection]):
return True
@property
- def can_native_upsert(self) -> bool:
- """
- Do we support native UPSERTs? This requires SQLite3 3.24+, plus some
- more work we haven't done yet to tell what was inserted vs updated.
- """
- return sqlite3.sqlite_version_info >= (3, 24, 0)
-
- @property
def supports_using_any_list(self) -> bool:
"""Do we support using `a = ANY(?)` and passing a list"""
return False
@@ -70,12 +62,11 @@ class Sqlite3Engine(BaseDatabaseEngine[sqlite3.Connection]):
self, db_conn: sqlite3.Connection, allow_outdated_version: bool = False
) -> None:
if not allow_outdated_version:
- version = sqlite3.sqlite_version_info
# Synapse is untested against older SQLite versions, and we don't want
# to let users upgrade to a version of Synapse with broken support for their
# sqlite version, because it risks leaving them with a half-upgraded db.
- if version < (3, 22, 0):
- raise RuntimeError("Synapse requires sqlite 3.22 or above.")
+ if sqlite3.sqlite_version_info < (3, 27, 0):
+ raise RuntimeError("Synapse requires sqlite 3.27 or above.")
def check_new_database(self, txn: Cursor) -> None:
"""Gets called when setting up a brand new database. This allows us to
diff --git a/synapse/storage/schema/main/delta/72/06add_consent_ts_to_users.sql b/synapse/storage/schema/main/delta/72/06add_consent_ts_to_users.sql
new file mode 100644
index 0000000000..609eb1750f
--- /dev/null
+++ b/synapse/storage/schema/main/delta/72/06add_consent_ts_to_users.sql
@@ -0,0 +1,16 @@
+/* Copyright 2022 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ALTER TABLE users ADD consent_ts bigint;
diff --git a/synapse/util/caches/__init__.py b/synapse/util/caches/__init__.py
index d4a2b77c29..35c0be08b0 100644
--- a/synapse/util/caches/__init__.py
+++ b/synapse/util/caches/__init__.py
@@ -20,9 +20,11 @@ from sys import intern
from typing import Any, Callable, Dict, List, Optional, Sized, TypeVar
import attr
+from prometheus_client import REGISTRY
from prometheus_client.core import Gauge
from synapse.config.cache import add_resizable_cache
+from synapse.util.metrics import DynamicCollectorRegistry
logger = logging.getLogger(__name__)
@@ -30,27 +32,62 @@ logger = logging.getLogger(__name__)
# Whether to track estimated memory usage of the LruCaches.
TRACK_MEMORY_USAGE = False
+# We track cache metrics in a special registry that lets us update the metrics
+# just before they are returned from the scrape endpoint.
+CACHE_METRIC_REGISTRY = DynamicCollectorRegistry()
caches_by_name: Dict[str, Sized] = {}
-collectors_by_name: Dict[str, "CacheMetric"] = {}
-cache_size = Gauge("synapse_util_caches_cache_size", "", ["name"])
-cache_hits = Gauge("synapse_util_caches_cache_hits", "", ["name"])
-cache_evicted = Gauge("synapse_util_caches_cache_evicted_size", "", ["name", "reason"])
-cache_total = Gauge("synapse_util_caches_cache", "", ["name"])
-cache_max_size = Gauge("synapse_util_caches_cache_max_size", "", ["name"])
+cache_size = Gauge(
+ "synapse_util_caches_cache_size", "", ["name"], registry=CACHE_METRIC_REGISTRY
+)
+cache_hits = Gauge(
+ "synapse_util_caches_cache_hits", "", ["name"], registry=CACHE_METRIC_REGISTRY
+)
+cache_evicted = Gauge(
+ "synapse_util_caches_cache_evicted_size",
+ "",
+ ["name", "reason"],
+ registry=CACHE_METRIC_REGISTRY,
+)
+cache_total = Gauge(
+ "synapse_util_caches_cache", "", ["name"], registry=CACHE_METRIC_REGISTRY
+)
+cache_max_size = Gauge(
+ "synapse_util_caches_cache_max_size", "", ["name"], registry=CACHE_METRIC_REGISTRY
+)
cache_memory_usage = Gauge(
"synapse_util_caches_cache_size_bytes",
"Estimated memory usage of the caches",
["name"],
+ registry=CACHE_METRIC_REGISTRY,
)
-response_cache_size = Gauge("synapse_util_caches_response_cache_size", "", ["name"])
-response_cache_hits = Gauge("synapse_util_caches_response_cache_hits", "", ["name"])
+response_cache_size = Gauge(
+ "synapse_util_caches_response_cache_size",
+ "",
+ ["name"],
+ registry=CACHE_METRIC_REGISTRY,
+)
+response_cache_hits = Gauge(
+ "synapse_util_caches_response_cache_hits",
+ "",
+ ["name"],
+ registry=CACHE_METRIC_REGISTRY,
+)
response_cache_evicted = Gauge(
- "synapse_util_caches_response_cache_evicted_size", "", ["name", "reason"]
+ "synapse_util_caches_response_cache_evicted_size",
+ "",
+ ["name", "reason"],
+ registry=CACHE_METRIC_REGISTRY,
)
-response_cache_total = Gauge("synapse_util_caches_response_cache", "", ["name"])
+response_cache_total = Gauge(
+ "synapse_util_caches_response_cache", "", ["name"], registry=CACHE_METRIC_REGISTRY
+)
+
+
+# Register our custom cache metrics registry with the global registry
+REGISTRY.register(CACHE_METRIC_REGISTRY)
class EvictionReason(Enum):
@@ -168,9 +205,8 @@ def register_cache(
add_resizable_cache(cache_name, resize_callback)
metric = CacheMetric(cache, cache_type, cache_name, collect_callback)
- metric_name = "cache_%s_%s" % (cache_type, cache_name)
caches_by_name[cache_name] = cache
- collectors_by_name[metric_name] = metric
+ CACHE_METRIC_REGISTRY.register_hook(metric.collect)
return metric
diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py
index bc3b4938ea..9687120ebf 100644
--- a/synapse/util/metrics.py
+++ b/synapse/util/metrics.py
@@ -15,9 +15,9 @@
import logging
from functools import wraps
from types import TracebackType
-from typing import Awaitable, Callable, Optional, Type, TypeVar
+from typing import Awaitable, Callable, Generator, List, Optional, Type, TypeVar
-from prometheus_client import Counter
+from prometheus_client import CollectorRegistry, Counter, Metric
from typing_extensions import Concatenate, ParamSpec, Protocol
from synapse.logging.context import (
@@ -208,3 +208,33 @@ class Measure:
metrics.real_time_sum += duration
# TODO: Add other in flight metrics.
+
+
+class DynamicCollectorRegistry(CollectorRegistry):
+ """
+ Custom Prometheus Collector registry that calls a hook first, allowing you
+ to update metrics on-demand.
+
+ Don't forget to register this registry with the main registry!
+ """
+
+ def __init__(self) -> None:
+ super().__init__()
+ self._pre_update_hooks: List[Callable[[], None]] = []
+
+ def collect(self) -> Generator[Metric, None, None]:
+ """
+ Collects metrics, calling pre-update hooks first.
+ """
+
+ for pre_update_hook in self._pre_update_hooks:
+ pre_update_hook()
+
+ yield from super().collect()
+
+ def register_hook(self, hook: Callable[[], None]) -> None:
+ """
+ Registers a hook that is called before metric collection.
+ """
+
+ self._pre_update_hooks.append(hook)
diff --git a/tests/rest/admin/test_user.py b/tests/rest/admin/test_user.py
index 1afd082707..ec5ccf6fca 100644
--- a/tests/rest/admin/test_user.py
+++ b/tests/rest/admin/test_user.py
@@ -2580,6 +2580,7 @@ class UserRestTestCase(unittest.HomeserverTestCase):
self.assertIn("appservice_id", content)
self.assertIn("consent_server_notice_sent", content)
self.assertIn("consent_version", content)
+ self.assertIn("consent_ts", content)
self.assertIn("external_ids", content)
# This key was removed intentionally. Ensure it is not accidentally re-included.
diff --git a/tests/storage/test_base.py b/tests/storage/test_base.py
index cce8e75c74..40e58f8199 100644
--- a/tests/storage/test_base.py
+++ b/tests/storage/test_base.py
@@ -54,7 +54,6 @@ class SQLBaseStoreTestCase(unittest.TestCase):
sqlite_config = {"name": "sqlite3"}
engine = create_engine(sqlite_config)
fake_engine = Mock(wraps=engine)
- fake_engine.can_native_upsert = False
fake_engine.in_transaction.return_value = False
db = DatabasePool(Mock(), Mock(config=sqlite_config), fake_engine)
diff --git a/tests/storage/test_registration.py b/tests/storage/test_registration.py
index a49ac1525e..853a93afab 100644
--- a/tests/storage/test_registration.py
+++ b/tests/storage/test_registration.py
@@ -11,15 +11,18 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+from twisted.test.proto_helpers import MemoryReactor
from synapse.api.constants import UserTypes
from synapse.api.errors import ThreepidValidationError
+from synapse.server import HomeServer
+from synapse.util import Clock
from tests.unittest import HomeserverTestCase
class RegistrationStoreTestCase(HomeserverTestCase):
- def prepare(self, reactor, clock, hs):
+ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.store = hs.get_datastores().main
self.user_id = "@my-user:test"
@@ -27,7 +30,7 @@ class RegistrationStoreTestCase(HomeserverTestCase):
self.pwhash = "{xx1}123456789"
self.device_id = "akgjhdjklgshg"
- def test_register(self):
+ def test_register(self) -> None:
self.get_success(self.store.register_user(self.user_id, self.pwhash))
self.assertEqual(
@@ -38,6 +41,7 @@ class RegistrationStoreTestCase(HomeserverTestCase):
"admin": 0,
"is_guest": 0,
"consent_version": None,
+ "consent_ts": None,
"consent_server_notice_sent": None,
"appservice_id": None,
"creation_ts": 0,
@@ -48,7 +52,20 @@ class RegistrationStoreTestCase(HomeserverTestCase):
(self.get_success(self.store.get_user_by_id(self.user_id))),
)
- def test_add_tokens(self):
+ def test_consent(self) -> None:
+ self.get_success(self.store.register_user(self.user_id, self.pwhash))
+ before_consent = self.clock.time_msec()
+ self.reactor.advance(5)
+ self.get_success(self.store.user_set_consent_version(self.user_id, "1"))
+ self.reactor.advance(5)
+
+ user = self.get_success(self.store.get_user_by_id(self.user_id))
+ assert user
+ self.assertEqual(user["consent_version"], "1")
+ self.assertGreater(user["consent_ts"], before_consent)
+ self.assertLess(user["consent_ts"], self.clock.time_msec())
+
+ def test_add_tokens(self) -> None:
self.get_success(self.store.register_user(self.user_id, self.pwhash))
self.get_success(
self.store.add_access_token_to_user(
@@ -58,11 +75,12 @@ class RegistrationStoreTestCase(HomeserverTestCase):
result = self.get_success(self.store.get_user_by_access_token(self.tokens[1]))
+ assert result
self.assertEqual(result.user_id, self.user_id)
self.assertEqual(result.device_id, self.device_id)
self.assertIsNotNone(result.token_id)
- def test_user_delete_access_tokens(self):
+ def test_user_delete_access_tokens(self) -> None:
# add some tokens
self.get_success(self.store.register_user(self.user_id, self.pwhash))
self.get_success(
@@ -87,6 +105,7 @@ class RegistrationStoreTestCase(HomeserverTestCase):
# check the one not associated with the device was not deleted
user = self.get_success(self.store.get_user_by_access_token(self.tokens[0]))
+ assert user
self.assertEqual(self.user_id, user.user_id)
# now delete the rest
@@ -95,11 +114,11 @@ class RegistrationStoreTestCase(HomeserverTestCase):
user = self.get_success(self.store.get_user_by_access_token(self.tokens[0]))
self.assertIsNone(user, "access token was not deleted without device_id")
- def test_is_support_user(self):
+ def test_is_support_user(self) -> None:
TEST_USER = "@test:test"
SUPPORT_USER = "@support:test"
- res = self.get_success(self.store.is_support_user(None))
+ res = self.get_success(self.store.is_support_user(None)) # type: ignore[arg-type]
self.assertFalse(res)
self.get_success(
self.store.register_user(user_id=TEST_USER, password_hash=None)
@@ -115,7 +134,7 @@ class RegistrationStoreTestCase(HomeserverTestCase):
res = self.get_success(self.store.is_support_user(SUPPORT_USER))
self.assertTrue(res)
- def test_3pid_inhibit_invalid_validation_session_error(self):
+ def test_3pid_inhibit_invalid_validation_session_error(self) -> None:
"""Tests that enabling the configuration option to inhibit 3PID errors on
/requestToken also inhibits validation errors caused by an unknown session ID.
"""
diff --git a/tests/test_types.py b/tests/test_types.py
index d8d82a517e..1111169384 100644
--- a/tests/test_types.py
+++ b/tests/test_types.py
@@ -13,11 +13,35 @@
# limitations under the License.
from synapse.api.errors import SynapseError
-from synapse.types import RoomAlias, UserID, map_username_to_mxid_localpart
+from synapse.types import (
+ RoomAlias,
+ UserID,
+ get_domain_from_id,
+ get_localpart_from_id,
+ map_username_to_mxid_localpart,
+)
from tests import unittest
+class IsMineIDTests(unittest.HomeserverTestCase):
+ def test_is_mine_id(self) -> None:
+ self.assertTrue(self.hs.is_mine_id("@user:test"))
+ self.assertTrue(self.hs.is_mine_id("#room:test"))
+ self.assertTrue(self.hs.is_mine_id("invalid:test"))
+
+ self.assertFalse(self.hs.is_mine_id("@user:test\0"))
+ self.assertFalse(self.hs.is_mine_id("@user"))
+
+ def test_two_colons(self) -> None:
+ """Test handling of IDs containing more than one colon."""
+ # The domain starts after the first colon.
+ # These functions must interpret things consistently.
+ self.assertFalse(self.hs.is_mine_id("@user:test:test"))
+ self.assertEqual("user", get_localpart_from_id("@user:test:test"))
+ self.assertEqual("test:test", get_domain_from_id("@user:test:test"))
+
+
class UserIDTestCase(unittest.HomeserverTestCase):
def test_parse(self):
user = UserID.from_string("@1234abcd:test")
|