diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index d53181deb1..1b511890aa 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -790,10 +790,6 @@ class FederationSenderHandler:
send_queue.process_rows_for_federation(self.federation_sender, rows)
await self.update_token(token)
- # We also need to poke the federation sender when new events happen
- elif stream_name == "events":
- self.federation_sender.notify_new_events(token)
-
# ... and when new receipts happen
elif stream_name == ReceiptsStream.NAME:
await self._on_new_receipts(rows)
diff --git a/synapse/events/builder.py b/synapse/events/builder.py
index b6c47be646..df4f950fec 100644
--- a/synapse/events/builder.py
+++ b/synapse/events/builder.py
@@ -97,32 +97,37 @@ class EventBuilder:
def is_state(self):
return self._state_key is not None
- async def build(self, prev_event_ids: List[str]) -> EventBase:
+ async def build(
+ self, prev_event_ids: List[str], auth_event_ids: Optional[List[str]]
+ ) -> EventBase:
"""Transform into a fully signed and hashed event
Args:
prev_event_ids: The event IDs to use as the prev events
+ auth_event_ids: The event IDs to use as the auth events.
+ Should normally be set to None, which will cause them to be calculated
+ based on the room state at the prev_events.
Returns:
The signed and hashed event.
"""
-
- state_ids = await self._state.get_current_state_ids(
- self.room_id, prev_event_ids
- )
- auth_ids = self._auth.compute_auth_events(self, state_ids)
+ if auth_event_ids is None:
+ state_ids = await self._state.get_current_state_ids(
+ self.room_id, prev_event_ids
+ )
+ auth_event_ids = self._auth.compute_auth_events(self, state_ids)
format_version = self.room_version.event_format
if format_version == EventFormatVersions.V1:
# The types of auth/prev events changes between event versions.
auth_events = await self._store.add_event_hashes(
- auth_ids
+ auth_event_ids
) # type: Union[List[str], List[Tuple[str, Dict[str, str]]]]
prev_events = await self._store.add_event_hashes(
prev_event_ids
) # type: Union[List[str], List[Tuple[str, Dict[str, str]]]]
else:
- auth_events = auth_ids
+ auth_events = auth_event_ids
prev_events = prev_event_ids
old_depth = await self._store.get_max_depth_of(prev_event_ids)
diff --git a/synapse/events/validator.py b/synapse/events/validator.py
index 9df35b54ba..5f9af8529b 100644
--- a/synapse/events/validator.py
+++ b/synapse/events/validator.py
@@ -83,6 +83,9 @@ class EventValidator:
Args:
event (FrozenEvent): The event to validate.
"""
+ if not event.is_state():
+ raise SynapseError(code=400, msg="must be a state event")
+
min_lifetime = event.content.get("min_lifetime")
max_lifetime = event.content.get("max_lifetime")
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 8e46957d15..5f1bf492c1 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -188,7 +188,7 @@ class FederationRemoteSendQueue:
for key in keys[:i]:
del self.edus[key]
- def notify_new_events(self, current_id):
+ def notify_new_events(self, max_token):
"""As per FederationSender"""
# We don't need to replicate this as it gets sent down a different
# stream.
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index e33b29a42c..604cfd1935 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -40,7 +40,7 @@ from synapse.metrics import (
events_processed_counter,
)
from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.types import ReadReceipt
+from synapse.types import ReadReceipt, RoomStreamToken
from synapse.util.metrics import Measure, measure_func
logger = logging.getLogger(__name__)
@@ -154,10 +154,15 @@ class FederationSender:
self._per_destination_queues[destination] = queue
return queue
- def notify_new_events(self, current_id: int) -> None:
+ def notify_new_events(self, max_token: RoomStreamToken) -> None:
"""This gets called when we have some new events we might want to
send out to other servers.
"""
+ # We just use the minimum stream ordering and ignore the vector clock
+ # component. This is safe to do as long as we *always* ignore the vector
+ # clock components.
+ current_id = max_token.stream
+
self._last_poked_id = max(current_id, self._last_poked_id)
if self._is_processing:
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 9d4e87dad6..c8d5e58035 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -27,6 +27,7 @@ from synapse.metrics import (
event_processing_loop_room_count,
)
from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.types import RoomStreamToken
from synapse.util.metrics import Measure
logger = logging.getLogger(__name__)
@@ -47,15 +48,17 @@ class ApplicationServicesHandler:
self.current_max = 0
self.is_processing = False
- async def notify_interested_services(self, current_id):
+ async def notify_interested_services(self, max_token: RoomStreamToken):
"""Notifies (pushes) all application services interested in this event.
Pushing is done asynchronously, so this method won't block for any
prolonged length of time.
-
- Args:
- current_id(int): The current maximum ID.
"""
+ # We just use the minimum stream ordering and ignore the vector clock
+ # component. This is safe to do as long as we *always* ignore the vector
+ # clock components.
+ current_id = max_token.stream
+
services = self.store.get_app_services()
if not services or not self.notify_appservices:
return
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index c52e6824d3..f18f882596 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -437,9 +437,9 @@ class EventCreationHandler:
self,
requester: Requester,
event_dict: dict,
- token_id: Optional[str] = None,
txn_id: Optional[str] = None,
prev_event_ids: Optional[List[str]] = None,
+ auth_event_ids: Optional[List[str]] = None,
require_consent: bool = True,
) -> Tuple[EventBase, EventContext]:
"""
@@ -453,13 +453,18 @@ class EventCreationHandler:
Args:
requester
event_dict: An entire event
- token_id
txn_id
prev_event_ids:
the forward extremities to use as the prev_events for the
new event.
If None, they will be requested from the database.
+
+ auth_event_ids:
+ The event ids to use as the auth_events for the new event.
+ Should normally be left as None, which will cause them to be calculated
+ based on the room state at the prev_events.
+
require_consent: Whether to check if the requester has
consented to the privacy policy.
Raises:
@@ -511,14 +516,17 @@ class EventCreationHandler:
if require_consent and not is_exempt:
await self.assert_accepted_privacy_policy(requester)
- if token_id is not None:
- builder.internal_metadata.token_id = token_id
+ if requester.access_token_id is not None:
+ builder.internal_metadata.token_id = requester.access_token_id
if txn_id is not None:
builder.internal_metadata.txn_id = txn_id
event, context = await self.create_new_client_event(
- builder=builder, requester=requester, prev_event_ids=prev_event_ids,
+ builder=builder,
+ requester=requester,
+ prev_event_ids=prev_event_ids,
+ auth_event_ids=auth_event_ids,
)
# In an ideal world we wouldn't need the second part of this condition. However,
@@ -726,7 +734,7 @@ class EventCreationHandler:
return event, event.internal_metadata.stream_ordering
event, context = await self.create_event(
- requester, event_dict, token_id=requester.access_token_id, txn_id=txn_id
+ requester, event_dict, txn_id=txn_id
)
assert self.hs.is_mine_id(event.sender), "User must be our own: %s" % (
@@ -757,6 +765,7 @@ class EventCreationHandler:
builder: EventBuilder,
requester: Optional[Requester] = None,
prev_event_ids: Optional[List[str]] = None,
+ auth_event_ids: Optional[List[str]] = None,
) -> Tuple[EventBase, EventContext]:
"""Create a new event for a local client
@@ -769,6 +778,11 @@ class EventCreationHandler:
If None, they will be requested from the database.
+ auth_event_ids:
+ The event ids to use as the auth_events for the new event.
+ Should normally be left as None, which will cause them to be calculated
+ based on the room state at the prev_events.
+
Returns:
Tuple of created event, context
"""
@@ -790,7 +804,9 @@ class EventCreationHandler:
builder.type == EventTypes.Create or len(prev_event_ids) > 0
), "Attempting to create an event with no prev_events"
- event = await builder.build(prev_event_ids=prev_event_ids)
+ event = await builder.build(
+ prev_event_ids=prev_event_ids, auth_event_ids=auth_event_ids
+ )
context = await self.state.compute_event_context(event)
if requester:
context.app_service = requester.app_service
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 93ed51063a..ec300d8877 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -214,7 +214,6 @@ class RoomCreationHandler(BaseHandler):
"replacement_room": new_room_id,
},
},
- token_id=requester.access_token_id,
)
old_room_version = await self.store.get_room_version_id(old_room_id)
await self.auth.check_from_context(
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 0080eeaf8d..ec784030e9 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -17,12 +17,10 @@ import abc
import logging
import random
from http import HTTPStatus
-from typing import TYPE_CHECKING, Iterable, List, Optional, Tuple, Union
-
-from unpaddedbase64 import encode_base64
+from typing import TYPE_CHECKING, Iterable, List, Optional, Tuple
from synapse import types
-from synapse.api.constants import MAX_DEPTH, AccountDataTypes, EventTypes, Membership
+from synapse.api.constants import AccountDataTypes, EventTypes, Membership
from synapse.api.errors import (
AuthError,
Codes,
@@ -31,12 +29,8 @@ from synapse.api.errors import (
SynapseError,
)
from synapse.api.ratelimiting import Ratelimiter
-from synapse.api.room_versions import EventFormatVersions
-from synapse.crypto.event_signing import compute_event_reference_hash
from synapse.events import EventBase
-from synapse.events.builder import create_local_event_from_event_dict
from synapse.events.snapshot import EventContext
-from synapse.events.validator import EventValidator
from synapse.storage.roommember import RoomsForUser
from synapse.types import JsonDict, Requester, RoomAlias, RoomID, StateMap, UserID
from synapse.util.async_helpers import Linearizer
@@ -193,7 +187,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
# For backwards compatibility:
"membership": membership,
},
- token_id=requester.access_token_id,
txn_id=txn_id,
prev_event_ids=prev_event_ids,
require_consent=require_consent,
@@ -1133,31 +1126,10 @@ class RoomMemberMasterHandler(RoomMemberHandler):
room_id = invite_event.room_id
target_user = invite_event.state_key
- room_version = await self.store.get_room_version(room_id)
content["membership"] = Membership.LEAVE
- # the auth events for the new event are the same as that of the invite, plus
- # the invite itself.
- #
- # the prev_events are just the invite.
- invite_hash = invite_event.event_id # type: Union[str, Tuple]
- if room_version.event_format == EventFormatVersions.V1:
- alg, h = compute_event_reference_hash(invite_event)
- invite_hash = (invite_event.event_id, {alg: encode_base64(h)})
-
- auth_events = tuple(invite_event.auth_events) + (invite_hash,)
- prev_events = (invite_hash,)
-
- # we cap depth of generated events, to ensure that they are not
- # rejected by other servers (and so that they can be persisted in
- # the db)
- depth = min(invite_event.depth + 1, MAX_DEPTH)
-
event_dict = {
- "depth": depth,
- "auth_events": auth_events,
- "prev_events": prev_events,
"type": EventTypes.Member,
"room_id": room_id,
"sender": target_user,
@@ -1165,24 +1137,23 @@ class RoomMemberMasterHandler(RoomMemberHandler):
"state_key": target_user,
}
- event = create_local_event_from_event_dict(
- clock=self.clock,
- hostname=self.hs.hostname,
- signing_key=self.hs.signing_key,
- room_version=room_version,
- event_dict=event_dict,
+ # the auth events for the new event are the same as that of the invite, plus
+ # the invite itself.
+ #
+ # the prev_events are just the invite.
+ prev_event_ids = [invite_event.event_id]
+ auth_event_ids = invite_event.auth_event_ids() + prev_event_ids
+
+ event, context = await self.event_creation_handler.create_event(
+ requester,
+ event_dict,
+ txn_id=txn_id,
+ prev_event_ids=prev_event_ids,
+ auth_event_ids=auth_event_ids,
)
event.internal_metadata.outlier = True
event.internal_metadata.out_of_band_membership = True
- if txn_id is not None:
- event.internal_metadata.txn_id = txn_id
- if requester.access_token_id is not None:
- event.internal_metadata.token_id = requester.access_token_id
-
- EventValidator().validate_new(event, self.config)
- context = await self.state_handler.compute_event_context(event)
- context.app_service = requester.app_service
result_event = await self.event_creation_handler.handle_new_client_event(
requester, event, context, extra_users=[UserID.from_string(target_user)],
)
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 13adeed01e..51c830c91e 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -319,19 +319,19 @@ class Notifier:
)
if self.federation_sender:
- self.federation_sender.notify_new_events(max_room_stream_token.stream)
+ self.federation_sender.notify_new_events(max_room_stream_token)
async def _notify_app_services(self, max_room_stream_token: RoomStreamToken):
try:
await self.appservice_handler.notify_interested_services(
- max_room_stream_token.stream
+ max_room_stream_token
)
except Exception:
logger.exception("Error notifying application services of event")
async def _notify_pusher_pool(self, max_room_stream_token: RoomStreamToken):
try:
- await self._pusher_pool.on_new_notifications(max_room_stream_token.stream)
+ await self._pusher_pool.on_new_notifications(max_room_stream_token)
except Exception:
logger.exception("Error pusher pool of event")
diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py
index 28bd8ab748..c6763971ee 100644
--- a/synapse/push/emailpusher.py
+++ b/synapse/push/emailpusher.py
@@ -18,6 +18,7 @@ import logging
from twisted.internet.error import AlreadyCalled, AlreadyCancelled
from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.types import RoomStreamToken
logger = logging.getLogger(__name__)
@@ -91,7 +92,12 @@ class EmailPusher:
pass
self.timed_call = None
- def on_new_notifications(self, max_stream_ordering):
+ def on_new_notifications(self, max_token: RoomStreamToken):
+ # We just use the minimum stream ordering and ignore the vector clock
+ # component. This is safe to do as long as we *always* ignore the vector
+ # clock components.
+ max_stream_ordering = max_token.stream
+
if self.max_stream_ordering:
self.max_stream_ordering = max(
max_stream_ordering, self.max_stream_ordering
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index 26706bf3e1..793d0db2d9 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -23,6 +23,7 @@ from synapse.api.constants import EventTypes
from synapse.logging import opentracing
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.push import PusherConfigException
+from synapse.types import RoomStreamToken
from . import push_rule_evaluator, push_tools
@@ -114,7 +115,12 @@ class HttpPusher:
if should_check_for_notifs:
self._start_processing()
- def on_new_notifications(self, max_stream_ordering):
+ def on_new_notifications(self, max_token: RoomStreamToken):
+ # We just use the minimum stream ordering and ignore the vector clock
+ # component. This is safe to do as long as we *always* ignore the vector
+ # clock components.
+ max_stream_ordering = max_token.stream
+
self.max_stream_ordering = max(
max_stream_ordering, self.max_stream_ordering or 0
)
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index 76150e117b..0080c68ce2 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -24,6 +24,7 @@ from synapse.push import PusherConfigException
from synapse.push.emailpusher import EmailPusher
from synapse.push.httppusher import HttpPusher
from synapse.push.pusher import PusherFactory
+from synapse.types import RoomStreamToken
from synapse.util.async_helpers import concurrently_execute
if TYPE_CHECKING:
@@ -186,11 +187,16 @@ class PusherPool:
)
await self.remove_pusher(p["app_id"], p["pushkey"], p["user_name"])
- async def on_new_notifications(self, max_stream_id: int):
+ async def on_new_notifications(self, max_token: RoomStreamToken):
if not self.pushers:
# nothing to do here.
return
+ # We just use the minimum stream ordering and ignore the vector clock
+ # component. This is safe to do as long as we *always* ignore the vector
+ # clock components.
+ max_stream_id = max_token.stream
+
if max_stream_id < self._last_room_stream_id_seen:
# Nothing to do
return
@@ -214,7 +220,7 @@ class PusherPool:
if u in self.pushers:
for p in self.pushers[u].values():
- p.on_new_notifications(max_stream_id)
+ p.on_new_notifications(max_token)
except Exception:
logger.exception("Exception in pusher on_new_notifications")
diff --git a/synapse/rest/client/v1/profile.py b/synapse/rest/client/v1/profile.py
index b686cd671f..e7fcd2b1ff 100644
--- a/synapse/rest/client/v1/profile.py
+++ b/synapse/rest/client/v1/profile.py
@@ -59,7 +59,9 @@ class ProfileDisplaynameRestServlet(RestServlet):
try:
new_name = content["displayname"]
except Exception:
- return 400, "Unable to parse name"
+ raise SynapseError(
+ code=400, msg="Unable to parse name", errcode=Codes.BAD_JSON,
+ )
await self.profile_handler.set_displayname(user, requester, new_name, is_admin)
diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index 0ba3a025cf..763722d6bc 100644
--- a/synapse/storage/database.py
+++ b/synapse/storage/database.py
@@ -893,6 +893,12 @@ class DatabasePool:
attempts = 0
while True:
try:
+ # We can autocommit if we are going to use native upserts
+ autocommit = (
+ self.engine.can_native_upsert
+ and table not in self._unsafe_to_upsert_tables
+ )
+
return await self.runInteraction(
desc,
self.simple_upsert_txn,
@@ -901,6 +907,7 @@ class DatabasePool:
values,
insertion_values,
lock=lock,
+ db_autocommit=autocommit,
)
except self.engine.module.IntegrityError as e:
attempts += 1
@@ -1063,6 +1070,43 @@ class DatabasePool:
)
txn.execute(sql, list(allvalues.values()))
+ async def simple_upsert_many(
+ self,
+ table: str,
+ key_names: Collection[str],
+ key_values: Collection[Iterable[Any]],
+ value_names: Collection[str],
+ value_values: Iterable[Iterable[Any]],
+ desc: str,
+ ) -> None:
+ """
+ Upsert, many times.
+
+ Args:
+ table: The table to upsert into
+ key_names: The key column names.
+ key_values: A list of each row's key column values.
+ value_names: The value column names
+ value_values: A list of each row's value column values.
+ Ignored if value_names is empty.
+ """
+
+ # We can autocommit if we are going to use native upserts
+ autocommit = (
+ self.engine.can_native_upsert and table not in self._unsafe_to_upsert_tables
+ )
+
+ return await self.runInteraction(
+ desc,
+ self.simple_upsert_many_txn,
+ table,
+ key_names,
+ key_values,
+ value_names,
+ value_values,
+ db_autocommit=autocommit,
+ )
+
def simple_upsert_many_txn(
self,
txn: LoggingTransaction,
@@ -1214,7 +1258,13 @@ class DatabasePool:
desc: description of the transaction, for logging and metrics
"""
return await self.runInteraction(
- desc, self.simple_select_one_txn, table, keyvalues, retcols, allow_none
+ desc,
+ self.simple_select_one_txn,
+ table,
+ keyvalues,
+ retcols,
+ allow_none,
+ db_autocommit=True,
)
@overload
@@ -1265,6 +1315,7 @@ class DatabasePool:
keyvalues,
retcol,
allow_none=allow_none,
+ db_autocommit=True,
)
@overload
@@ -1346,7 +1397,12 @@ class DatabasePool:
Results in a list
"""
return await self.runInteraction(
- desc, self.simple_select_onecol_txn, table, keyvalues, retcol
+ desc,
+ self.simple_select_onecol_txn,
+ table,
+ keyvalues,
+ retcol,
+ db_autocommit=True,
)
async def simple_select_list(
@@ -1371,7 +1427,12 @@ class DatabasePool:
A list of dictionaries.
"""
return await self.runInteraction(
- desc, self.simple_select_list_txn, table, keyvalues, retcols
+ desc,
+ self.simple_select_list_txn,
+ table,
+ keyvalues,
+ retcols,
+ db_autocommit=True,
)
@classmethod
@@ -1450,6 +1511,7 @@ class DatabasePool:
chunk,
keyvalues,
retcols,
+ db_autocommit=True,
)
results.extend(rows)
@@ -1548,7 +1610,12 @@ class DatabasePool:
desc: description of the transaction, for logging and metrics
"""
await self.runInteraction(
- desc, self.simple_update_one_txn, table, keyvalues, updatevalues
+ desc,
+ self.simple_update_one_txn,
+ table,
+ keyvalues,
+ updatevalues,
+ db_autocommit=True,
)
@classmethod
@@ -1607,7 +1674,9 @@ class DatabasePool:
keyvalues: dict of column names and values to select the row with
desc: description of the transaction, for logging and metrics
"""
- await self.runInteraction(desc, self.simple_delete_one_txn, table, keyvalues)
+ await self.runInteraction(
+ desc, self.simple_delete_one_txn, table, keyvalues, db_autocommit=True,
+ )
@staticmethod
def simple_delete_one_txn(
@@ -1646,7 +1715,9 @@ class DatabasePool:
Returns:
The number of deleted rows.
"""
- return await self.runInteraction(desc, self.simple_delete_txn, table, keyvalues)
+ return await self.runInteraction(
+ desc, self.simple_delete_txn, table, keyvalues, db_autocommit=True
+ )
@staticmethod
def simple_delete_txn(
@@ -1694,7 +1765,13 @@ class DatabasePool:
Number rows deleted
"""
return await self.runInteraction(
- desc, self.simple_delete_many_txn, table, column, iterable, keyvalues
+ desc,
+ self.simple_delete_many_txn,
+ table,
+ column,
+ iterable,
+ keyvalues,
+ db_autocommit=True,
)
@staticmethod
@@ -1860,7 +1937,13 @@ class DatabasePool:
"""
return await self.runInteraction(
- desc, self.simple_search_list_txn, table, term, col, retcols
+ desc,
+ self.simple_search_list_txn,
+ table,
+ term,
+ col,
+ retcols,
+ db_autocommit=True,
)
@classmethod
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index fdb17745f6..ba3b1769b0 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -1270,6 +1270,10 @@ class PersistEventsStore:
)
def _store_retention_policy_for_room_txn(self, txn, event):
+ if not event.is_state():
+ logger.debug("Ignoring non-state m.room.retention event")
+ return
+
if hasattr(event, "content") and (
"min_lifetime" in event.content or "max_lifetime" in event.content
):
diff --git a/synapse/storage/databases/main/keys.py b/synapse/storage/databases/main/keys.py
index ad43bb05ab..f8f4bb9b3f 100644
--- a/synapse/storage/databases/main/keys.py
+++ b/synapse/storage/databases/main/keys.py
@@ -122,9 +122,7 @@ class KeyStore(SQLBaseStore):
# param, which is itself the 2-tuple (server_name, key_id).
invalidations.append((server_name, key_id))
- await self.db_pool.runInteraction(
- "store_server_verify_keys",
- self.db_pool.simple_upsert_many_txn,
+ await self.db_pool.simple_upsert_many(
table="server_signature_keys",
key_names=("server_name", "key_id"),
key_values=key_values,
@@ -135,6 +133,7 @@ class KeyStore(SQLBaseStore):
"verify_key",
),
value_values=value_values,
+ desc="store_server_verify_keys",
)
invalidate = self._get_server_verify_key.invalidate
diff --git a/synapse/storage/databases/main/metrics.py b/synapse/storage/databases/main/metrics.py
index 0acf0617ca..79b01d16f9 100644
--- a/synapse/storage/databases/main/metrics.py
+++ b/synapse/storage/databases/main/metrics.py
@@ -281,9 +281,14 @@ class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore):
a_day_in_milliseconds = 24 * 60 * 60 * 1000
now = self._clock.time_msec()
+ # A note on user_agent. Technically a given device can have multiple
+ # user agents, so we need to decide which one to pick. We could have handled this
+ # in number of ways, but given that we don't _that_ much have gone for MAX()
+ # For more details of the other options considered see
+ # https://github.com/matrix-org/synapse/pull/8503#discussion_r502306111
sql = """
- INSERT INTO user_daily_visits (user_id, device_id, timestamp)
- SELECT u.user_id, u.device_id, ?
+ INSERT INTO user_daily_visits (user_id, device_id, timestamp, user_agent)
+ SELECT u.user_id, u.device_id, ?, MAX(u.user_agent)
FROM user_ips AS u
LEFT JOIN (
SELECT user_id, device_id, timestamp FROM user_daily_visits
@@ -294,7 +299,7 @@ class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore):
WHERE last_seen > ? AND last_seen <= ?
AND udv.timestamp IS NULL AND users.is_guest=0
AND users.appservice_id IS NULL
- GROUP BY u.user_id, u.device_id
+ GROUP BY u.user_id, u.device_id, u.user_agent
"""
# This means that the day has rolled over but there could still
diff --git a/synapse/storage/databases/main/schema/delta/58/20user_daily_visits.sql b/synapse/storage/databases/main/schema/delta/58/20user_daily_visits.sql
new file mode 100644
index 0000000000..b0b5dcddce
--- /dev/null
+++ b/synapse/storage/databases/main/schema/delta/58/20user_daily_visits.sql
@@ -0,0 +1,18 @@
+/* Copyright 2020 The Matrix.org Foundation C.I.C.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ -- Add new column to user_daily_visits to track user agent
+ALTER TABLE user_daily_visits
+ ADD COLUMN user_agent TEXT;
diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py
index 7d46090267..59207cadd4 100644
--- a/synapse/storage/databases/main/transactions.py
+++ b/synapse/storage/databases/main/transactions.py
@@ -208,42 +208,56 @@ class TransactionStore(TransactionWorkerStore):
"""
self._destination_retry_cache.pop(destination, None)
- return await self.db_pool.runInteraction(
- "set_destination_retry_timings",
- self._set_destination_retry_timings,
- destination,
- failure_ts,
- retry_last_ts,
- retry_interval,
- )
+ if self.database_engine.can_native_upsert:
+ return await self.db_pool.runInteraction(
+ "set_destination_retry_timings",
+ self._set_destination_retry_timings_native,
+ destination,
+ failure_ts,
+ retry_last_ts,
+ retry_interval,
+ db_autocommit=True, # Safe as its a single upsert
+ )
+ else:
+ return await self.db_pool.runInteraction(
+ "set_destination_retry_timings",
+ self._set_destination_retry_timings_emulated,
+ destination,
+ failure_ts,
+ retry_last_ts,
+ retry_interval,
+ )
- def _set_destination_retry_timings(
+ def _set_destination_retry_timings_native(
self, txn, destination, failure_ts, retry_last_ts, retry_interval
):
+ assert self.database_engine.can_native_upsert
+
+ # Upsert retry time interval if retry_interval is zero (i.e. we're
+ # resetting it) or greater than the existing retry interval.
+ #
+ # WARNING: This is executed in autocommit, so we shouldn't add any more
+ # SQL calls in here (without being very careful).
+ sql = """
+ INSERT INTO destinations (
+ destination, failure_ts, retry_last_ts, retry_interval
+ )
+ VALUES (?, ?, ?, ?)
+ ON CONFLICT (destination) DO UPDATE SET
+ failure_ts = EXCLUDED.failure_ts,
+ retry_last_ts = EXCLUDED.retry_last_ts,
+ retry_interval = EXCLUDED.retry_interval
+ WHERE
+ EXCLUDED.retry_interval = 0
+ OR destinations.retry_interval IS NULL
+ OR destinations.retry_interval < EXCLUDED.retry_interval
+ """
- if self.database_engine.can_native_upsert:
- # Upsert retry time interval if retry_interval is zero (i.e. we're
- # resetting it) or greater than the existing retry interval.
-
- sql = """
- INSERT INTO destinations (
- destination, failure_ts, retry_last_ts, retry_interval
- )
- VALUES (?, ?, ?, ?)
- ON CONFLICT (destination) DO UPDATE SET
- failure_ts = EXCLUDED.failure_ts,
- retry_last_ts = EXCLUDED.retry_last_ts,
- retry_interval = EXCLUDED.retry_interval
- WHERE
- EXCLUDED.retry_interval = 0
- OR destinations.retry_interval IS NULL
- OR destinations.retry_interval < EXCLUDED.retry_interval
- """
-
- txn.execute(sql, (destination, failure_ts, retry_last_ts, retry_interval))
-
- return
+ txn.execute(sql, (destination, failure_ts, retry_last_ts, retry_interval))
+ def _set_destination_retry_timings_emulated(
+ self, txn, destination, failure_ts, retry_last_ts, retry_interval
+ ):
self.database_engine.lock_table(txn, "destinations")
# We need to be careful here as the data may have changed from under us
diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py
index 5a390ff2f6..d87ceec6da 100644
--- a/synapse/storage/databases/main/user_directory.py
+++ b/synapse/storage/databases/main/user_directory.py
@@ -480,21 +480,16 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
user_id_tuples: iterable of 2-tuple of user IDs.
"""
- def _add_users_who_share_room_txn(txn):
- self.db_pool.simple_upsert_many_txn(
- txn,
- table="users_who_share_private_rooms",
- key_names=["user_id", "other_user_id", "room_id"],
- key_values=[
- (user_id, other_user_id, room_id)
- for user_id, other_user_id in user_id_tuples
- ],
- value_names=(),
- value_values=None,
- )
-
- await self.db_pool.runInteraction(
- "add_users_who_share_room", _add_users_who_share_room_txn
+ await self.db_pool.simple_upsert_many(
+ table="users_who_share_private_rooms",
+ key_names=["user_id", "other_user_id", "room_id"],
+ key_values=[
+ (user_id, other_user_id, room_id)
+ for user_id, other_user_id in user_id_tuples
+ ],
+ value_names=(),
+ value_values=None,
+ desc="add_users_who_share_room",
)
async def add_users_in_public_rooms(
@@ -508,19 +503,13 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
user_ids
"""
- def _add_users_in_public_rooms_txn(txn):
-
- self.db_pool.simple_upsert_many_txn(
- txn,
- table="users_in_public_rooms",
- key_names=["user_id", "room_id"],
- key_values=[(user_id, room_id) for user_id in user_ids],
- value_names=(),
- value_values=None,
- )
-
- await self.db_pool.runInteraction(
- "add_users_in_public_rooms", _add_users_in_public_rooms_txn
+ await self.db_pool.simple_upsert_many(
+ table="users_in_public_rooms",
+ key_names=["user_id", "room_id"],
+ key_values=[(user_id, room_id) for user_id in user_ids],
+ value_names=(),
+ value_values=None,
+ desc="add_users_in_public_rooms",
)
async def delete_all_from_user_dir(self) -> None:
diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py
index 3d8da48f2d..02d71302ea 100644
--- a/synapse/storage/util/id_generators.py
+++ b/synapse/storage/util/id_generators.py
@@ -618,14 +618,7 @@ class _MultiWriterCtxManager:
db_autocommit=True,
)
- # Assert the fetched ID is actually greater than any ID we've already
- # seen. If not, then the sequence and table have got out of sync
- # somehow.
with self.id_gen._lock:
- assert max(self.id_gen._current_positions.values(), default=0) < min(
- self.stream_ids
- )
-
self.id_gen._unfinished_ids.update(self.stream_ids)
if self.multiple_ids is None:
|