diff --git a/changelog.d/8488.misc b/changelog.d/8488.misc
new file mode 100644
index 0000000000..237cb3b311
--- /dev/null
+++ b/changelog.d/8488.misc
@@ -0,0 +1 @@
+Allow events to be sent to clients sooner when using sharded event persisters.
diff --git a/changelog.d/8503.misc b/changelog.d/8503.misc
new file mode 100644
index 0000000000..edb1be8aa8
--- /dev/null
+++ b/changelog.d/8503.misc
@@ -0,0 +1 @@
+Add user agent to user_daily_visits table.
diff --git a/changelog.d/8517.bugfix b/changelog.d/8517.bugfix
new file mode 100644
index 0000000000..1ab623c59f
--- /dev/null
+++ b/changelog.d/8517.bugfix
@@ -0,0 +1 @@
+Fix error code for `/profile/{userId}/displayname` to be `M_BAD_JSON`.
diff --git a/changelog.d/8526.doc b/changelog.d/8526.doc
new file mode 100644
index 0000000000..cbf48680c1
--- /dev/null
+++ b/changelog.d/8526.doc
@@ -0,0 +1 @@
+Added note about docker in manhole.md regarding which ip address to bind to. Contributed by @Maquis196.
diff --git a/changelog.d/8527.bugfix b/changelog.d/8527.bugfix
new file mode 100644
index 0000000000..727e0ba299
--- /dev/null
+++ b/changelog.d/8527.bugfix
@@ -0,0 +1 @@
+Fix a bug introduced in v1.7.0 that could cause Synapse to insert values from non-state `m.room.retention` events into the `room_retention` database table.
diff --git a/changelog.d/8529.doc b/changelog.d/8529.doc
new file mode 100644
index 0000000000..6e710e6527
--- /dev/null
+++ b/changelog.d/8529.doc
@@ -0,0 +1 @@
+Document the new behaviour of the `allowed_lifetime_min` and `allowed_lifetime_max` settings in the room retention configuration.
diff --git a/changelog.d/8530.bugfix b/changelog.d/8530.bugfix
new file mode 100644
index 0000000000..443d88424e
--- /dev/null
+++ b/changelog.d/8530.bugfix
@@ -0,0 +1 @@
+Fix rare bug where sending an event would fail due to a racey assertion.
diff --git a/changelog.d/8536.bugfix b/changelog.d/8536.bugfix
new file mode 100644
index 0000000000..8d238cc008
--- /dev/null
+++ b/changelog.d/8536.bugfix
@@ -0,0 +1 @@
+Fix not sending events over federation when using sharded event writers.
diff --git a/changelog.d/8537.misc b/changelog.d/8537.misc
new file mode 100644
index 0000000000..26309b5b93
--- /dev/null
+++ b/changelog.d/8537.misc
@@ -0,0 +1 @@
+Factor out common code between `RoomMemberHandler._locally_reject_invite` and `EventCreationHandler.create_event`.
diff --git a/changelog.d/8542.misc b/changelog.d/8542.misc
new file mode 100644
index 0000000000..63149fd9b9
--- /dev/null
+++ b/changelog.d/8542.misc
@@ -0,0 +1 @@
+Improve database performance by executing more queries without starting transactions.
diff --git a/docs/manhole.md b/docs/manhole.md
index 75b6ae40e0..37d1d7823c 100644
--- a/docs/manhole.md
+++ b/docs/manhole.md
@@ -5,22 +5,54 @@ The "manhole" allows server administrators to access a Python shell on a running
Synapse installation. This is a very powerful mechanism for administration and
debugging.
+**_Security Warning_**
+
+Note that this will give administrative access to synapse to **all users** with
+shell access to the server. It should therefore **not** be enabled in
+environments where untrusted users have shell access.
+
+***
+
To enable it, first uncomment the `manhole` listener configuration in
-`homeserver.yaml`:
+`homeserver.yaml`. The configuration is slightly different if you're using docker.
+
+#### Docker config
+
+If you are using Docker, set `bind_addresses` to `['0.0.0.0']` as shown:
```yaml
listeners:
- port: 9000
- bind_addresses: ['::1', '127.0.0.1']
+ bind_addresses: ['0.0.0.0']
type: manhole
```
-(`bind_addresses` in the above is important: it ensures that access to the
-manhole is only possible for local users).
+When using `docker run` to start the server, you will then need to change the command to the following to include the
+`manhole` port forwarding. The `-p 127.0.0.1:9000:9000` below is important: it
+ensures that access to the `manhole` is only possible for local users.
-Note that this will give administrative access to synapse to **all users** with
-shell access to the server. It should therefore **not** be enabled in
-environments where untrusted users have shell access.
+```bash
+docker run -d --name synapse \
+ --mount type=volume,src=synapse-data,dst=/data \
+ -p 8008:8008 \
+ -p 127.0.0.1:9000:9000 \
+ matrixdotorg/synapse:latest
+```
+
+#### Native config
+
+If you are not using docker, set `bind_addresses` to `['::1', '127.0.0.1']` as shown.
+The `bind_addresses` in the example below is important: it ensures that access to the
+`manhole` is only possible for local users).
+
+```yaml
+listeners:
+ - port: 9000
+ bind_addresses: ['::1', '127.0.0.1']
+ type: manhole
+```
+
+#### Accessing synapse manhole
Then restart synapse, and point an ssh client at port 9000 on localhost, using
the username `matrix`:
diff --git a/docs/message_retention_policies.md b/docs/message_retention_policies.md
index 1dd60bdad9..75d2028e17 100644
--- a/docs/message_retention_policies.md
+++ b/docs/message_retention_policies.md
@@ -136,24 +136,34 @@ the server's database.
### Lifetime limits
-**Note: this feature is mainly useful within a closed federation or on
-servers that don't federate, because there currently is no way to
-enforce these limits in an open federation.**
-
-Server admins can restrict the values their local users are allowed to
-use for both `min_lifetime` and `max_lifetime`. These limits can be
-defined as such in the `retention` section of the configuration file:
+Server admins can set limits on the values of `max_lifetime` to use when
+purging old events in a room. These limits can be defined as such in the
+`retention` section of the configuration file:
```yaml
allowed_lifetime_min: 1d
allowed_lifetime_max: 1y
```
-Here, `allowed_lifetime_min` is the lowest value a local user can set
-for both `min_lifetime` and `max_lifetime`, and `allowed_lifetime_max`
-is the highest value. Both parameters are optional (e.g. setting
-`allowed_lifetime_min` but not `allowed_lifetime_max` only enforces a
-minimum and no maximum).
+The limits are considered when running purge jobs. If necessary, the
+effective value of `max_lifetime` will be brought between
+`allowed_lifetime_min` and `allowed_lifetime_max` (inclusive).
+This means that, if the value of `max_lifetime` defined in the room's state
+is lower than `allowed_lifetime_min`, the value of `allowed_lifetime_min`
+will be used instead. Likewise, if the value of `max_lifetime` is higher
+than `allowed_lifetime_max`, the value of `allowed_lifetime_max` will be
+used instead.
+
+In the example above, we ensure Synapse never deletes events that are less
+than one day old, and that it always deletes events that are over a year
+old.
+
+If a default policy is set, and its `max_lifetime` value is lower than
+`allowed_lifetime_min` or higher than `allowed_lifetime_max`, the same
+process applies.
+
+Both parameters are optional; if one is omitted Synapse won't use it to
+adjust the effective value of `max_lifetime`.
Like other settings in this section, these parameters can be expressed
either as a duration or as a number of milliseconds.
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index d53181deb1..1b511890aa 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -790,10 +790,6 @@ class FederationSenderHandler:
send_queue.process_rows_for_federation(self.federation_sender, rows)
await self.update_token(token)
- # We also need to poke the federation sender when new events happen
- elif stream_name == "events":
- self.federation_sender.notify_new_events(token)
-
# ... and when new receipts happen
elif stream_name == ReceiptsStream.NAME:
await self._on_new_receipts(rows)
diff --git a/synapse/events/builder.py b/synapse/events/builder.py
index b6c47be646..df4f950fec 100644
--- a/synapse/events/builder.py
+++ b/synapse/events/builder.py
@@ -97,32 +97,37 @@ class EventBuilder:
def is_state(self):
return self._state_key is not None
- async def build(self, prev_event_ids: List[str]) -> EventBase:
+ async def build(
+ self, prev_event_ids: List[str], auth_event_ids: Optional[List[str]]
+ ) -> EventBase:
"""Transform into a fully signed and hashed event
Args:
prev_event_ids: The event IDs to use as the prev events
+ auth_event_ids: The event IDs to use as the auth events.
+ Should normally be set to None, which will cause them to be calculated
+ based on the room state at the prev_events.
Returns:
The signed and hashed event.
"""
-
- state_ids = await self._state.get_current_state_ids(
- self.room_id, prev_event_ids
- )
- auth_ids = self._auth.compute_auth_events(self, state_ids)
+ if auth_event_ids is None:
+ state_ids = await self._state.get_current_state_ids(
+ self.room_id, prev_event_ids
+ )
+ auth_event_ids = self._auth.compute_auth_events(self, state_ids)
format_version = self.room_version.event_format
if format_version == EventFormatVersions.V1:
# The types of auth/prev events changes between event versions.
auth_events = await self._store.add_event_hashes(
- auth_ids
+ auth_event_ids
) # type: Union[List[str], List[Tuple[str, Dict[str, str]]]]
prev_events = await self._store.add_event_hashes(
prev_event_ids
) # type: Union[List[str], List[Tuple[str, Dict[str, str]]]]
else:
- auth_events = auth_ids
+ auth_events = auth_event_ids
prev_events = prev_event_ids
old_depth = await self._store.get_max_depth_of(prev_event_ids)
diff --git a/synapse/events/validator.py b/synapse/events/validator.py
index 9df35b54ba..5f9af8529b 100644
--- a/synapse/events/validator.py
+++ b/synapse/events/validator.py
@@ -83,6 +83,9 @@ class EventValidator:
Args:
event (FrozenEvent): The event to validate.
"""
+ if not event.is_state():
+ raise SynapseError(code=400, msg="must be a state event")
+
min_lifetime = event.content.get("min_lifetime")
max_lifetime = event.content.get("max_lifetime")
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 8e46957d15..5f1bf492c1 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -188,7 +188,7 @@ class FederationRemoteSendQueue:
for key in keys[:i]:
del self.edus[key]
- def notify_new_events(self, current_id):
+ def notify_new_events(self, max_token):
"""As per FederationSender"""
# We don't need to replicate this as it gets sent down a different
# stream.
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index e33b29a42c..604cfd1935 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -40,7 +40,7 @@ from synapse.metrics import (
events_processed_counter,
)
from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.types import ReadReceipt
+from synapse.types import ReadReceipt, RoomStreamToken
from synapse.util.metrics import Measure, measure_func
logger = logging.getLogger(__name__)
@@ -154,10 +154,15 @@ class FederationSender:
self._per_destination_queues[destination] = queue
return queue
- def notify_new_events(self, current_id: int) -> None:
+ def notify_new_events(self, max_token: RoomStreamToken) -> None:
"""This gets called when we have some new events we might want to
send out to other servers.
"""
+ # We just use the minimum stream ordering and ignore the vector clock
+ # component. This is safe to do as long as we *always* ignore the vector
+ # clock components.
+ current_id = max_token.stream
+
self._last_poked_id = max(current_id, self._last_poked_id)
if self._is_processing:
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 9d4e87dad6..c8d5e58035 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -27,6 +27,7 @@ from synapse.metrics import (
event_processing_loop_room_count,
)
from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.types import RoomStreamToken
from synapse.util.metrics import Measure
logger = logging.getLogger(__name__)
@@ -47,15 +48,17 @@ class ApplicationServicesHandler:
self.current_max = 0
self.is_processing = False
- async def notify_interested_services(self, current_id):
+ async def notify_interested_services(self, max_token: RoomStreamToken):
"""Notifies (pushes) all application services interested in this event.
Pushing is done asynchronously, so this method won't block for any
prolonged length of time.
-
- Args:
- current_id(int): The current maximum ID.
"""
+ # We just use the minimum stream ordering and ignore the vector clock
+ # component. This is safe to do as long as we *always* ignore the vector
+ # clock components.
+ current_id = max_token.stream
+
services = self.store.get_app_services()
if not services or not self.notify_appservices:
return
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index c52e6824d3..f18f882596 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -437,9 +437,9 @@ class EventCreationHandler:
self,
requester: Requester,
event_dict: dict,
- token_id: Optional[str] = None,
txn_id: Optional[str] = None,
prev_event_ids: Optional[List[str]] = None,
+ auth_event_ids: Optional[List[str]] = None,
require_consent: bool = True,
) -> Tuple[EventBase, EventContext]:
"""
@@ -453,13 +453,18 @@ class EventCreationHandler:
Args:
requester
event_dict: An entire event
- token_id
txn_id
prev_event_ids:
the forward extremities to use as the prev_events for the
new event.
If None, they will be requested from the database.
+
+ auth_event_ids:
+ The event ids to use as the auth_events for the new event.
+ Should normally be left as None, which will cause them to be calculated
+ based on the room state at the prev_events.
+
require_consent: Whether to check if the requester has
consented to the privacy policy.
Raises:
@@ -511,14 +516,17 @@ class EventCreationHandler:
if require_consent and not is_exempt:
await self.assert_accepted_privacy_policy(requester)
- if token_id is not None:
- builder.internal_metadata.token_id = token_id
+ if requester.access_token_id is not None:
+ builder.internal_metadata.token_id = requester.access_token_id
if txn_id is not None:
builder.internal_metadata.txn_id = txn_id
event, context = await self.create_new_client_event(
- builder=builder, requester=requester, prev_event_ids=prev_event_ids,
+ builder=builder,
+ requester=requester,
+ prev_event_ids=prev_event_ids,
+ auth_event_ids=auth_event_ids,
)
# In an ideal world we wouldn't need the second part of this condition. However,
@@ -726,7 +734,7 @@ class EventCreationHandler:
return event, event.internal_metadata.stream_ordering
event, context = await self.create_event(
- requester, event_dict, token_id=requester.access_token_id, txn_id=txn_id
+ requester, event_dict, txn_id=txn_id
)
assert self.hs.is_mine_id(event.sender), "User must be our own: %s" % (
@@ -757,6 +765,7 @@ class EventCreationHandler:
builder: EventBuilder,
requester: Optional[Requester] = None,
prev_event_ids: Optional[List[str]] = None,
+ auth_event_ids: Optional[List[str]] = None,
) -> Tuple[EventBase, EventContext]:
"""Create a new event for a local client
@@ -769,6 +778,11 @@ class EventCreationHandler:
If None, they will be requested from the database.
+ auth_event_ids:
+ The event ids to use as the auth_events for the new event.
+ Should normally be left as None, which will cause them to be calculated
+ based on the room state at the prev_events.
+
Returns:
Tuple of created event, context
"""
@@ -790,7 +804,9 @@ class EventCreationHandler:
builder.type == EventTypes.Create or len(prev_event_ids) > 0
), "Attempting to create an event with no prev_events"
- event = await builder.build(prev_event_ids=prev_event_ids)
+ event = await builder.build(
+ prev_event_ids=prev_event_ids, auth_event_ids=auth_event_ids
+ )
context = await self.state.compute_event_context(event)
if requester:
context.app_service = requester.app_service
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 93ed51063a..ec300d8877 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -214,7 +214,6 @@ class RoomCreationHandler(BaseHandler):
"replacement_room": new_room_id,
},
},
- token_id=requester.access_token_id,
)
old_room_version = await self.store.get_room_version_id(old_room_id)
await self.auth.check_from_context(
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 0080eeaf8d..ec784030e9 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -17,12 +17,10 @@ import abc
import logging
import random
from http import HTTPStatus
-from typing import TYPE_CHECKING, Iterable, List, Optional, Tuple, Union
-
-from unpaddedbase64 import encode_base64
+from typing import TYPE_CHECKING, Iterable, List, Optional, Tuple
from synapse import types
-from synapse.api.constants import MAX_DEPTH, AccountDataTypes, EventTypes, Membership
+from synapse.api.constants import AccountDataTypes, EventTypes, Membership
from synapse.api.errors import (
AuthError,
Codes,
@@ -31,12 +29,8 @@ from synapse.api.errors import (
SynapseError,
)
from synapse.api.ratelimiting import Ratelimiter
-from synapse.api.room_versions import EventFormatVersions
-from synapse.crypto.event_signing import compute_event_reference_hash
from synapse.events import EventBase
-from synapse.events.builder import create_local_event_from_event_dict
from synapse.events.snapshot import EventContext
-from synapse.events.validator import EventValidator
from synapse.storage.roommember import RoomsForUser
from synapse.types import JsonDict, Requester, RoomAlias, RoomID, StateMap, UserID
from synapse.util.async_helpers import Linearizer
@@ -193,7 +187,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
# For backwards compatibility:
"membership": membership,
},
- token_id=requester.access_token_id,
txn_id=txn_id,
prev_event_ids=prev_event_ids,
require_consent=require_consent,
@@ -1133,31 +1126,10 @@ class RoomMemberMasterHandler(RoomMemberHandler):
room_id = invite_event.room_id
target_user = invite_event.state_key
- room_version = await self.store.get_room_version(room_id)
content["membership"] = Membership.LEAVE
- # the auth events for the new event are the same as that of the invite, plus
- # the invite itself.
- #
- # the prev_events are just the invite.
- invite_hash = invite_event.event_id # type: Union[str, Tuple]
- if room_version.event_format == EventFormatVersions.V1:
- alg, h = compute_event_reference_hash(invite_event)
- invite_hash = (invite_event.event_id, {alg: encode_base64(h)})
-
- auth_events = tuple(invite_event.auth_events) + (invite_hash,)
- prev_events = (invite_hash,)
-
- # we cap depth of generated events, to ensure that they are not
- # rejected by other servers (and so that they can be persisted in
- # the db)
- depth = min(invite_event.depth + 1, MAX_DEPTH)
-
event_dict = {
- "depth": depth,
- "auth_events": auth_events,
- "prev_events": prev_events,
"type": EventTypes.Member,
"room_id": room_id,
"sender": target_user,
@@ -1165,24 +1137,23 @@ class RoomMemberMasterHandler(RoomMemberHandler):
"state_key": target_user,
}
- event = create_local_event_from_event_dict(
- clock=self.clock,
- hostname=self.hs.hostname,
- signing_key=self.hs.signing_key,
- room_version=room_version,
- event_dict=event_dict,
+ # the auth events for the new event are the same as that of the invite, plus
+ # the invite itself.
+ #
+ # the prev_events are just the invite.
+ prev_event_ids = [invite_event.event_id]
+ auth_event_ids = invite_event.auth_event_ids() + prev_event_ids
+
+ event, context = await self.event_creation_handler.create_event(
+ requester,
+ event_dict,
+ txn_id=txn_id,
+ prev_event_ids=prev_event_ids,
+ auth_event_ids=auth_event_ids,
)
event.internal_metadata.outlier = True
event.internal_metadata.out_of_band_membership = True
- if txn_id is not None:
- event.internal_metadata.txn_id = txn_id
- if requester.access_token_id is not None:
- event.internal_metadata.token_id = requester.access_token_id
-
- EventValidator().validate_new(event, self.config)
- context = await self.state_handler.compute_event_context(event)
- context.app_service = requester.app_service
result_event = await self.event_creation_handler.handle_new_client_event(
requester, event, context, extra_users=[UserID.from_string(target_user)],
)
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 13adeed01e..51c830c91e 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -319,19 +319,19 @@ class Notifier:
)
if self.federation_sender:
- self.federation_sender.notify_new_events(max_room_stream_token.stream)
+ self.federation_sender.notify_new_events(max_room_stream_token)
async def _notify_app_services(self, max_room_stream_token: RoomStreamToken):
try:
await self.appservice_handler.notify_interested_services(
- max_room_stream_token.stream
+ max_room_stream_token
)
except Exception:
logger.exception("Error notifying application services of event")
async def _notify_pusher_pool(self, max_room_stream_token: RoomStreamToken):
try:
- await self._pusher_pool.on_new_notifications(max_room_stream_token.stream)
+ await self._pusher_pool.on_new_notifications(max_room_stream_token)
except Exception:
logger.exception("Error pusher pool of event")
diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py
index 28bd8ab748..c6763971ee 100644
--- a/synapse/push/emailpusher.py
+++ b/synapse/push/emailpusher.py
@@ -18,6 +18,7 @@ import logging
from twisted.internet.error import AlreadyCalled, AlreadyCancelled
from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.types import RoomStreamToken
logger = logging.getLogger(__name__)
@@ -91,7 +92,12 @@ class EmailPusher:
pass
self.timed_call = None
- def on_new_notifications(self, max_stream_ordering):
+ def on_new_notifications(self, max_token: RoomStreamToken):
+ # We just use the minimum stream ordering and ignore the vector clock
+ # component. This is safe to do as long as we *always* ignore the vector
+ # clock components.
+ max_stream_ordering = max_token.stream
+
if self.max_stream_ordering:
self.max_stream_ordering = max(
max_stream_ordering, self.max_stream_ordering
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index 26706bf3e1..793d0db2d9 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -23,6 +23,7 @@ from synapse.api.constants import EventTypes
from synapse.logging import opentracing
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.push import PusherConfigException
+from synapse.types import RoomStreamToken
from . import push_rule_evaluator, push_tools
@@ -114,7 +115,12 @@ class HttpPusher:
if should_check_for_notifs:
self._start_processing()
- def on_new_notifications(self, max_stream_ordering):
+ def on_new_notifications(self, max_token: RoomStreamToken):
+ # We just use the minimum stream ordering and ignore the vector clock
+ # component. This is safe to do as long as we *always* ignore the vector
+ # clock components.
+ max_stream_ordering = max_token.stream
+
self.max_stream_ordering = max(
max_stream_ordering, self.max_stream_ordering or 0
)
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index 76150e117b..0080c68ce2 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -24,6 +24,7 @@ from synapse.push import PusherConfigException
from synapse.push.emailpusher import EmailPusher
from synapse.push.httppusher import HttpPusher
from synapse.push.pusher import PusherFactory
+from synapse.types import RoomStreamToken
from synapse.util.async_helpers import concurrently_execute
if TYPE_CHECKING:
@@ -186,11 +187,16 @@ class PusherPool:
)
await self.remove_pusher(p["app_id"], p["pushkey"], p["user_name"])
- async def on_new_notifications(self, max_stream_id: int):
+ async def on_new_notifications(self, max_token: RoomStreamToken):
if not self.pushers:
# nothing to do here.
return
+ # We just use the minimum stream ordering and ignore the vector clock
+ # component. This is safe to do as long as we *always* ignore the vector
+ # clock components.
+ max_stream_id = max_token.stream
+
if max_stream_id < self._last_room_stream_id_seen:
# Nothing to do
return
@@ -214,7 +220,7 @@ class PusherPool:
if u in self.pushers:
for p in self.pushers[u].values():
- p.on_new_notifications(max_stream_id)
+ p.on_new_notifications(max_token)
except Exception:
logger.exception("Exception in pusher on_new_notifications")
diff --git a/synapse/rest/client/v1/profile.py b/synapse/rest/client/v1/profile.py
index b686cd671f..e7fcd2b1ff 100644
--- a/synapse/rest/client/v1/profile.py
+++ b/synapse/rest/client/v1/profile.py
@@ -59,7 +59,9 @@ class ProfileDisplaynameRestServlet(RestServlet):
try:
new_name = content["displayname"]
except Exception:
- return 400, "Unable to parse name"
+ raise SynapseError(
+ code=400, msg="Unable to parse name", errcode=Codes.BAD_JSON,
+ )
await self.profile_handler.set_displayname(user, requester, new_name, is_admin)
diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index 0ba3a025cf..763722d6bc 100644
--- a/synapse/storage/database.py
+++ b/synapse/storage/database.py
@@ -893,6 +893,12 @@ class DatabasePool:
attempts = 0
while True:
try:
+ # We can autocommit if we are going to use native upserts
+ autocommit = (
+ self.engine.can_native_upsert
+ and table not in self._unsafe_to_upsert_tables
+ )
+
return await self.runInteraction(
desc,
self.simple_upsert_txn,
@@ -901,6 +907,7 @@ class DatabasePool:
values,
insertion_values,
lock=lock,
+ db_autocommit=autocommit,
)
except self.engine.module.IntegrityError as e:
attempts += 1
@@ -1063,6 +1070,43 @@ class DatabasePool:
)
txn.execute(sql, list(allvalues.values()))
+ async def simple_upsert_many(
+ self,
+ table: str,
+ key_names: Collection[str],
+ key_values: Collection[Iterable[Any]],
+ value_names: Collection[str],
+ value_values: Iterable[Iterable[Any]],
+ desc: str,
+ ) -> None:
+ """
+ Upsert, many times.
+
+ Args:
+ table: The table to upsert into
+ key_names: The key column names.
+ key_values: A list of each row's key column values.
+ value_names: The value column names
+ value_values: A list of each row's value column values.
+ Ignored if value_names is empty.
+ """
+
+ # We can autocommit if we are going to use native upserts
+ autocommit = (
+ self.engine.can_native_upsert and table not in self._unsafe_to_upsert_tables
+ )
+
+ return await self.runInteraction(
+ desc,
+ self.simple_upsert_many_txn,
+ table,
+ key_names,
+ key_values,
+ value_names,
+ value_values,
+ db_autocommit=autocommit,
+ )
+
def simple_upsert_many_txn(
self,
txn: LoggingTransaction,
@@ -1214,7 +1258,13 @@ class DatabasePool:
desc: description of the transaction, for logging and metrics
"""
return await self.runInteraction(
- desc, self.simple_select_one_txn, table, keyvalues, retcols, allow_none
+ desc,
+ self.simple_select_one_txn,
+ table,
+ keyvalues,
+ retcols,
+ allow_none,
+ db_autocommit=True,
)
@overload
@@ -1265,6 +1315,7 @@ class DatabasePool:
keyvalues,
retcol,
allow_none=allow_none,
+ db_autocommit=True,
)
@overload
@@ -1346,7 +1397,12 @@ class DatabasePool:
Results in a list
"""
return await self.runInteraction(
- desc, self.simple_select_onecol_txn, table, keyvalues, retcol
+ desc,
+ self.simple_select_onecol_txn,
+ table,
+ keyvalues,
+ retcol,
+ db_autocommit=True,
)
async def simple_select_list(
@@ -1371,7 +1427,12 @@ class DatabasePool:
A list of dictionaries.
"""
return await self.runInteraction(
- desc, self.simple_select_list_txn, table, keyvalues, retcols
+ desc,
+ self.simple_select_list_txn,
+ table,
+ keyvalues,
+ retcols,
+ db_autocommit=True,
)
@classmethod
@@ -1450,6 +1511,7 @@ class DatabasePool:
chunk,
keyvalues,
retcols,
+ db_autocommit=True,
)
results.extend(rows)
@@ -1548,7 +1610,12 @@ class DatabasePool:
desc: description of the transaction, for logging and metrics
"""
await self.runInteraction(
- desc, self.simple_update_one_txn, table, keyvalues, updatevalues
+ desc,
+ self.simple_update_one_txn,
+ table,
+ keyvalues,
+ updatevalues,
+ db_autocommit=True,
)
@classmethod
@@ -1607,7 +1674,9 @@ class DatabasePool:
keyvalues: dict of column names and values to select the row with
desc: description of the transaction, for logging and metrics
"""
- await self.runInteraction(desc, self.simple_delete_one_txn, table, keyvalues)
+ await self.runInteraction(
+ desc, self.simple_delete_one_txn, table, keyvalues, db_autocommit=True,
+ )
@staticmethod
def simple_delete_one_txn(
@@ -1646,7 +1715,9 @@ class DatabasePool:
Returns:
The number of deleted rows.
"""
- return await self.runInteraction(desc, self.simple_delete_txn, table, keyvalues)
+ return await self.runInteraction(
+ desc, self.simple_delete_txn, table, keyvalues, db_autocommit=True
+ )
@staticmethod
def simple_delete_txn(
@@ -1694,7 +1765,13 @@ class DatabasePool:
Number rows deleted
"""
return await self.runInteraction(
- desc, self.simple_delete_many_txn, table, column, iterable, keyvalues
+ desc,
+ self.simple_delete_many_txn,
+ table,
+ column,
+ iterable,
+ keyvalues,
+ db_autocommit=True,
)
@staticmethod
@@ -1860,7 +1937,13 @@ class DatabasePool:
"""
return await self.runInteraction(
- desc, self.simple_search_list_txn, table, term, col, retcols
+ desc,
+ self.simple_search_list_txn,
+ table,
+ term,
+ col,
+ retcols,
+ db_autocommit=True,
)
@classmethod
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index fdb17745f6..ba3b1769b0 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -1270,6 +1270,10 @@ class PersistEventsStore:
)
def _store_retention_policy_for_room_txn(self, txn, event):
+ if not event.is_state():
+ logger.debug("Ignoring non-state m.room.retention event")
+ return
+
if hasattr(event, "content") and (
"min_lifetime" in event.content or "max_lifetime" in event.content
):
diff --git a/synapse/storage/databases/main/keys.py b/synapse/storage/databases/main/keys.py
index ad43bb05ab..f8f4bb9b3f 100644
--- a/synapse/storage/databases/main/keys.py
+++ b/synapse/storage/databases/main/keys.py
@@ -122,9 +122,7 @@ class KeyStore(SQLBaseStore):
# param, which is itself the 2-tuple (server_name, key_id).
invalidations.append((server_name, key_id))
- await self.db_pool.runInteraction(
- "store_server_verify_keys",
- self.db_pool.simple_upsert_many_txn,
+ await self.db_pool.simple_upsert_many(
table="server_signature_keys",
key_names=("server_name", "key_id"),
key_values=key_values,
@@ -135,6 +133,7 @@ class KeyStore(SQLBaseStore):
"verify_key",
),
value_values=value_values,
+ desc="store_server_verify_keys",
)
invalidate = self._get_server_verify_key.invalidate
diff --git a/synapse/storage/databases/main/metrics.py b/synapse/storage/databases/main/metrics.py
index 0acf0617ca..79b01d16f9 100644
--- a/synapse/storage/databases/main/metrics.py
+++ b/synapse/storage/databases/main/metrics.py
@@ -281,9 +281,14 @@ class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore):
a_day_in_milliseconds = 24 * 60 * 60 * 1000
now = self._clock.time_msec()
+ # A note on user_agent. Technically a given device can have multiple
+ # user agents, so we need to decide which one to pick. We could have handled this
+ # in number of ways, but given that we don't _that_ much have gone for MAX()
+ # For more details of the other options considered see
+ # https://github.com/matrix-org/synapse/pull/8503#discussion_r502306111
sql = """
- INSERT INTO user_daily_visits (user_id, device_id, timestamp)
- SELECT u.user_id, u.device_id, ?
+ INSERT INTO user_daily_visits (user_id, device_id, timestamp, user_agent)
+ SELECT u.user_id, u.device_id, ?, MAX(u.user_agent)
FROM user_ips AS u
LEFT JOIN (
SELECT user_id, device_id, timestamp FROM user_daily_visits
@@ -294,7 +299,7 @@ class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore):
WHERE last_seen > ? AND last_seen <= ?
AND udv.timestamp IS NULL AND users.is_guest=0
AND users.appservice_id IS NULL
- GROUP BY u.user_id, u.device_id
+ GROUP BY u.user_id, u.device_id, u.user_agent
"""
# This means that the day has rolled over but there could still
diff --git a/synapse/storage/databases/main/schema/delta/58/20user_daily_visits.sql b/synapse/storage/databases/main/schema/delta/58/20user_daily_visits.sql
new file mode 100644
index 0000000000..b0b5dcddce
--- /dev/null
+++ b/synapse/storage/databases/main/schema/delta/58/20user_daily_visits.sql
@@ -0,0 +1,18 @@
+/* Copyright 2020 The Matrix.org Foundation C.I.C.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ -- Add new column to user_daily_visits to track user agent
+ALTER TABLE user_daily_visits
+ ADD COLUMN user_agent TEXT;
diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py
index 7d46090267..59207cadd4 100644
--- a/synapse/storage/databases/main/transactions.py
+++ b/synapse/storage/databases/main/transactions.py
@@ -208,42 +208,56 @@ class TransactionStore(TransactionWorkerStore):
"""
self._destination_retry_cache.pop(destination, None)
- return await self.db_pool.runInteraction(
- "set_destination_retry_timings",
- self._set_destination_retry_timings,
- destination,
- failure_ts,
- retry_last_ts,
- retry_interval,
- )
+ if self.database_engine.can_native_upsert:
+ return await self.db_pool.runInteraction(
+ "set_destination_retry_timings",
+ self._set_destination_retry_timings_native,
+ destination,
+ failure_ts,
+ retry_last_ts,
+ retry_interval,
+ db_autocommit=True, # Safe as its a single upsert
+ )
+ else:
+ return await self.db_pool.runInteraction(
+ "set_destination_retry_timings",
+ self._set_destination_retry_timings_emulated,
+ destination,
+ failure_ts,
+ retry_last_ts,
+ retry_interval,
+ )
- def _set_destination_retry_timings(
+ def _set_destination_retry_timings_native(
self, txn, destination, failure_ts, retry_last_ts, retry_interval
):
+ assert self.database_engine.can_native_upsert
+
+ # Upsert retry time interval if retry_interval is zero (i.e. we're
+ # resetting it) or greater than the existing retry interval.
+ #
+ # WARNING: This is executed in autocommit, so we shouldn't add any more
+ # SQL calls in here (without being very careful).
+ sql = """
+ INSERT INTO destinations (
+ destination, failure_ts, retry_last_ts, retry_interval
+ )
+ VALUES (?, ?, ?, ?)
+ ON CONFLICT (destination) DO UPDATE SET
+ failure_ts = EXCLUDED.failure_ts,
+ retry_last_ts = EXCLUDED.retry_last_ts,
+ retry_interval = EXCLUDED.retry_interval
+ WHERE
+ EXCLUDED.retry_interval = 0
+ OR destinations.retry_interval IS NULL
+ OR destinations.retry_interval < EXCLUDED.retry_interval
+ """
- if self.database_engine.can_native_upsert:
- # Upsert retry time interval if retry_interval is zero (i.e. we're
- # resetting it) or greater than the existing retry interval.
-
- sql = """
- INSERT INTO destinations (
- destination, failure_ts, retry_last_ts, retry_interval
- )
- VALUES (?, ?, ?, ?)
- ON CONFLICT (destination) DO UPDATE SET
- failure_ts = EXCLUDED.failure_ts,
- retry_last_ts = EXCLUDED.retry_last_ts,
- retry_interval = EXCLUDED.retry_interval
- WHERE
- EXCLUDED.retry_interval = 0
- OR destinations.retry_interval IS NULL
- OR destinations.retry_interval < EXCLUDED.retry_interval
- """
-
- txn.execute(sql, (destination, failure_ts, retry_last_ts, retry_interval))
-
- return
+ txn.execute(sql, (destination, failure_ts, retry_last_ts, retry_interval))
+ def _set_destination_retry_timings_emulated(
+ self, txn, destination, failure_ts, retry_last_ts, retry_interval
+ ):
self.database_engine.lock_table(txn, "destinations")
# We need to be careful here as the data may have changed from under us
diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py
index 5a390ff2f6..d87ceec6da 100644
--- a/synapse/storage/databases/main/user_directory.py
+++ b/synapse/storage/databases/main/user_directory.py
@@ -480,21 +480,16 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
user_id_tuples: iterable of 2-tuple of user IDs.
"""
- def _add_users_who_share_room_txn(txn):
- self.db_pool.simple_upsert_many_txn(
- txn,
- table="users_who_share_private_rooms",
- key_names=["user_id", "other_user_id", "room_id"],
- key_values=[
- (user_id, other_user_id, room_id)
- for user_id, other_user_id in user_id_tuples
- ],
- value_names=(),
- value_values=None,
- )
-
- await self.db_pool.runInteraction(
- "add_users_who_share_room", _add_users_who_share_room_txn
+ await self.db_pool.simple_upsert_many(
+ table="users_who_share_private_rooms",
+ key_names=["user_id", "other_user_id", "room_id"],
+ key_values=[
+ (user_id, other_user_id, room_id)
+ for user_id, other_user_id in user_id_tuples
+ ],
+ value_names=(),
+ value_values=None,
+ desc="add_users_who_share_room",
)
async def add_users_in_public_rooms(
@@ -508,19 +503,13 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
user_ids
"""
- def _add_users_in_public_rooms_txn(txn):
-
- self.db_pool.simple_upsert_many_txn(
- txn,
- table="users_in_public_rooms",
- key_names=["user_id", "room_id"],
- key_values=[(user_id, room_id) for user_id in user_ids],
- value_names=(),
- value_values=None,
- )
-
- await self.db_pool.runInteraction(
- "add_users_in_public_rooms", _add_users_in_public_rooms_txn
+ await self.db_pool.simple_upsert_many(
+ table="users_in_public_rooms",
+ key_names=["user_id", "room_id"],
+ key_values=[(user_id, room_id) for user_id in user_ids],
+ value_names=(),
+ value_values=None,
+ desc="add_users_in_public_rooms",
)
async def delete_all_from_user_dir(self) -> None:
diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py
index 3d8da48f2d..02d71302ea 100644
--- a/synapse/storage/util/id_generators.py
+++ b/synapse/storage/util/id_generators.py
@@ -618,14 +618,7 @@ class _MultiWriterCtxManager:
db_autocommit=True,
)
- # Assert the fetched ID is actually greater than any ID we've already
- # seen. If not, then the sequence and table have got out of sync
- # somehow.
with self.id_gen._lock:
- assert max(self.id_gen._current_positions.values(), default=0) < min(
- self.stream_ids
- )
-
self.id_gen._unfinished_ids.update(self.stream_ids)
if self.multiple_ids is None:
diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py
index 2a0b7c1b56..ee4f3da31c 100644
--- a/tests/handlers/test_appservice.py
+++ b/tests/handlers/test_appservice.py
@@ -18,6 +18,7 @@ from mock import Mock
from twisted.internet import defer
from synapse.handlers.appservice import ApplicationServicesHandler
+from synapse.types import RoomStreamToken
from tests.test_utils import make_awaitable
from tests.utils import MockClock
@@ -61,7 +62,9 @@ class AppServiceHandlerTestCase(unittest.TestCase):
defer.succeed((0, [event])),
defer.succeed((0, [])),
]
- yield defer.ensureDeferred(self.handler.notify_interested_services(0))
+ yield defer.ensureDeferred(
+ self.handler.notify_interested_services(RoomStreamToken(None, 0))
+ )
self.mock_scheduler.submit_event_for_as.assert_called_once_with(
interested_service, event
)
@@ -80,7 +83,9 @@ class AppServiceHandlerTestCase(unittest.TestCase):
defer.succeed((0, [event])),
defer.succeed((0, [])),
]
- yield defer.ensureDeferred(self.handler.notify_interested_services(0))
+ yield defer.ensureDeferred(
+ self.handler.notify_interested_services(RoomStreamToken(None, 0))
+ )
self.mock_as_api.query_user.assert_called_once_with(services[0], user_id)
@defer.inlineCallbacks
@@ -97,7 +102,9 @@ class AppServiceHandlerTestCase(unittest.TestCase):
defer.succeed((0, [event])),
defer.succeed((0, [])),
]
- yield defer.ensureDeferred(self.handler.notify_interested_services(0))
+ yield defer.ensureDeferred(
+ self.handler.notify_interested_services(RoomStreamToken(None, 0))
+ )
self.assertFalse(
self.mock_as_api.query_user.called,
"query_user called when it shouldn't have been.",
diff --git a/tests/handlers/test_message.py b/tests/handlers/test_message.py
index 64e28bc639..9f6f21a6e2 100644
--- a/tests/handlers/test_message.py
+++ b/tests/handlers/test_message.py
@@ -66,7 +66,6 @@ class EventCreationTestCase(unittest.HomeserverTestCase):
"sender": self.requester.user.to_string(),
"content": {"msgtype": "m.text", "body": random_string(5)},
},
- token_id=self.token_id,
txn_id=txn_id,
)
)
diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py
index 914c82e7a8..8ed67640f8 100644
--- a/tests/handlers/test_presence.py
+++ b/tests/handlers/test_presence.py
@@ -615,7 +615,7 @@ class PresenceJoinTestCase(unittest.HomeserverTestCase):
self.store.get_latest_event_ids_in_room(room_id)
)
- event = self.get_success(builder.build(prev_event_ids))
+ event = self.get_success(builder.build(prev_event_ids, None))
self.get_success(self.federation_handler.on_receive_pdu(hostname, event))
diff --git a/tests/replication/test_federation_sender_shard.py b/tests/replication/test_federation_sender_shard.py
index 9c4a9c3563..779745ae9d 100644
--- a/tests/replication/test_federation_sender_shard.py
+++ b/tests/replication/test_federation_sender_shard.py
@@ -226,7 +226,7 @@ class FederationSenderTestCase(BaseMultiWorkerStreamTestCase):
}
builder = factory.for_room_version(room_version, event_dict)
- join_event = self.get_success(builder.build(prev_event_ids))
+ join_event = self.get_success(builder.build(prev_event_ids, None))
self.get_success(federation.on_send_join_request(remote_server, join_event))
self.replicate()
diff --git a/tests/replication/test_sharded_event_persister.py b/tests/replication/test_sharded_event_persister.py
index 6068d14905..82cf033d4e 100644
--- a/tests/replication/test_sharded_event_persister.py
+++ b/tests/replication/test_sharded_event_persister.py
@@ -14,8 +14,12 @@
# limitations under the License.
import logging
+from mock import patch
+
+from synapse.api.room_versions import RoomVersion
from synapse.rest import admin
from synapse.rest.client.v1 import login, room
+from synapse.rest.client.v2_alpha import sync
from tests.replication._base import BaseMultiWorkerStreamTestCase
from tests.utils import USE_POSTGRES_FOR_TESTS
@@ -36,6 +40,7 @@ class EventPersisterShardTestCase(BaseMultiWorkerStreamTestCase):
admin.register_servlets_for_client_rest_resource,
room.register_servlets,
login.register_servlets,
+ sync.register_servlets,
]
def prepare(self, reactor, clock, hs):
@@ -43,6 +48,9 @@ class EventPersisterShardTestCase(BaseMultiWorkerStreamTestCase):
self.other_user_id = self.register_user("otheruser", "pass")
self.other_access_token = self.login("otheruser", "pass")
+ self.room_creator = self.hs.get_room_creation_handler()
+ self.store = hs.get_datastore()
+
def default_config(self):
conf = super().default_config()
conf["redis"] = {"enabled": "true"}
@@ -53,6 +61,29 @@ class EventPersisterShardTestCase(BaseMultiWorkerStreamTestCase):
}
return conf
+ def _create_room(self, room_id: str, user_id: str, tok: str):
+ """Create a room with given room_id
+ """
+
+ # We control the room ID generation by patching out the
+ # `_generate_room_id` method
+ async def generate_room(
+ creator_id: str, is_public: bool, room_version: RoomVersion
+ ):
+ await self.store.store_room(
+ room_id=room_id,
+ room_creator_user_id=creator_id,
+ is_public=is_public,
+ room_version=room_version,
+ )
+ return room_id
+
+ with patch(
+ "synapse.handlers.room.RoomCreationHandler._generate_room_id"
+ ) as mock:
+ mock.side_effect = generate_room
+ self.helper.create_room_as(user_id, tok=tok)
+
def test_basic(self):
"""Simple test to ensure that multiple rooms can be created and joined,
and that different rooms get handled by different instances.
@@ -100,3 +131,189 @@ class EventPersisterShardTestCase(BaseMultiWorkerStreamTestCase):
self.assertTrue(persisted_on_1)
self.assertTrue(persisted_on_2)
+
+ def test_vector_clock_token(self):
+ """Tests that using a stream token with a vector clock component works
+ correctly with basic /sync and /messages usage.
+ """
+
+ self.make_worker_hs(
+ "synapse.app.generic_worker", {"worker_name": "worker1"},
+ )
+
+ worker_hs2 = self.make_worker_hs(
+ "synapse.app.generic_worker", {"worker_name": "worker2"},
+ )
+
+ sync_hs = self.make_worker_hs(
+ "synapse.app.generic_worker", {"worker_name": "sync"},
+ )
+
+ # Specially selected room IDs that get persisted on different workers.
+ room_id1 = "!foo:test"
+ room_id2 = "!baz:test"
+
+ self.assertEqual(
+ self.hs.config.worker.events_shard_config.get_instance(room_id1), "worker1"
+ )
+ self.assertEqual(
+ self.hs.config.worker.events_shard_config.get_instance(room_id2), "worker2"
+ )
+
+ user_id = self.register_user("user", "pass")
+ access_token = self.login("user", "pass")
+
+ store = self.hs.get_datastore()
+
+ # Create two room on the different workers.
+ self._create_room(room_id1, user_id, access_token)
+ self._create_room(room_id2, user_id, access_token)
+
+ # The other user joins
+ self.helper.join(
+ room=room_id1, user=self.other_user_id, tok=self.other_access_token
+ )
+ self.helper.join(
+ room=room_id2, user=self.other_user_id, tok=self.other_access_token
+ )
+
+ # Do an initial sync so that we're up to date.
+ request, channel = self.make_request("GET", "/sync", access_token=access_token)
+ self.render_on_worker(sync_hs, request)
+ next_batch = channel.json_body["next_batch"]
+
+ # We now gut wrench into the events stream MultiWriterIdGenerator on
+ # worker2 to mimic it getting stuck persisting an event. This ensures
+ # that when we send an event on worker1 we end up in a state where
+ # worker2 events stream position lags that on worker1, resulting in a
+ # RoomStreamToken with a non-empty instance map component.
+ #
+ # Worker2's event stream position will not advance until we call
+ # __aexit__ again.
+ actx = worker_hs2.get_datastore()._stream_id_gen.get_next()
+ self.get_success(actx.__aenter__())
+
+ response = self.helper.send(room_id1, body="Hi!", tok=self.other_access_token)
+ first_event_in_room1 = response["event_id"]
+
+ # Assert that the current stream token has an instance map component, as
+ # we are trying to test vector clock tokens.
+ room_stream_token = store.get_room_max_token()
+ self.assertNotEqual(len(room_stream_token.instance_map), 0)
+
+ # Check that syncing still gets the new event, despite the gap in the
+ # stream IDs.
+ request, channel = self.make_request(
+ "GET", "/sync?since={}".format(next_batch), access_token=access_token
+ )
+ self.render_on_worker(sync_hs, request)
+
+ # We should only see the new event and nothing else
+ self.assertIn(room_id1, channel.json_body["rooms"]["join"])
+ self.assertNotIn(room_id2, channel.json_body["rooms"]["join"])
+
+ events = channel.json_body["rooms"]["join"][room_id1]["timeline"]["events"]
+ self.assertListEqual(
+ [first_event_in_room1], [event["event_id"] for event in events]
+ )
+
+ # Get the next batch and makes sure its a vector clock style token.
+ vector_clock_token = channel.json_body["next_batch"]
+ self.assertTrue(vector_clock_token.startswith("m"))
+
+ # Now that we've got a vector clock token we finish the fake persisting
+ # an event we started above.
+ self.get_success(actx.__aexit__(None, None, None))
+
+ # Now try and send an event to the other rooom so that we can test that
+ # the vector clock style token works as a `since` token.
+ response = self.helper.send(room_id2, body="Hi!", tok=self.other_access_token)
+ first_event_in_room2 = response["event_id"]
+
+ request, channel = self.make_request(
+ "GET",
+ "/sync?since={}".format(vector_clock_token),
+ access_token=access_token,
+ )
+ self.render_on_worker(sync_hs, request)
+
+ self.assertNotIn(room_id1, channel.json_body["rooms"]["join"])
+ self.assertIn(room_id2, channel.json_body["rooms"]["join"])
+
+ events = channel.json_body["rooms"]["join"][room_id2]["timeline"]["events"]
+ self.assertListEqual(
+ [first_event_in_room2], [event["event_id"] for event in events]
+ )
+
+ next_batch = channel.json_body["next_batch"]
+
+ # We also want to test that the vector clock style token works with
+ # pagination. We do this by sending a couple of new events into the room
+ # and syncing again to get a prev_batch token for each room, then
+ # paginating from there back to the vector clock token.
+ self.helper.send(room_id1, body="Hi again!", tok=self.other_access_token)
+ self.helper.send(room_id2, body="Hi again!", tok=self.other_access_token)
+
+ request, channel = self.make_request(
+ "GET", "/sync?since={}".format(next_batch), access_token=access_token
+ )
+ self.render_on_worker(sync_hs, request)
+
+ prev_batch1 = channel.json_body["rooms"]["join"][room_id1]["timeline"][
+ "prev_batch"
+ ]
+ prev_batch2 = channel.json_body["rooms"]["join"][room_id2]["timeline"][
+ "prev_batch"
+ ]
+
+ # Paginating back in the first room should not produce any results, as
+ # no events have happened in it. This tests that we are correctly
+ # filtering results based on the vector clock portion.
+ request, channel = self.make_request(
+ "GET",
+ "/rooms/{}/messages?from={}&to={}&dir=b".format(
+ room_id1, prev_batch1, vector_clock_token
+ ),
+ access_token=access_token,
+ )
+ self.render_on_worker(sync_hs, request)
+ self.assertListEqual([], channel.json_body["chunk"])
+
+ # Paginating back on the second room should produce the first event
+ # again. This tests that pagination isn't completely broken.
+ request, channel = self.make_request(
+ "GET",
+ "/rooms/{}/messages?from={}&to={}&dir=b".format(
+ room_id2, prev_batch2, vector_clock_token
+ ),
+ access_token=access_token,
+ )
+ self.render_on_worker(sync_hs, request)
+ self.assertEqual(len(channel.json_body["chunk"]), 1)
+ self.assertEqual(
+ channel.json_body["chunk"][0]["event_id"], first_event_in_room2
+ )
+
+ # Paginating forwards should give the same results
+ request, channel = self.make_request(
+ "GET",
+ "/rooms/{}/messages?from={}&to={}&dir=f".format(
+ room_id1, vector_clock_token, prev_batch1
+ ),
+ access_token=access_token,
+ )
+ self.render_on_worker(sync_hs, request)
+ self.assertListEqual([], channel.json_body["chunk"])
+
+ request, channel = self.make_request(
+ "GET",
+ "/rooms/{}/messages?from={}&to={}&dir=f".format(
+ room_id2, vector_clock_token, prev_batch2,
+ ),
+ access_token=access_token,
+ )
+ self.render_on_worker(sync_hs, request)
+ self.assertEqual(len(channel.json_body["chunk"]), 1)
+ self.assertEqual(
+ channel.json_body["chunk"][0]["event_id"], first_event_in_room2
+ )
diff --git a/tests/storage/test_redaction.py b/tests/storage/test_redaction.py
index 1ea35d60c1..d4f9e809db 100644
--- a/tests/storage/test_redaction.py
+++ b/tests/storage/test_redaction.py
@@ -236,9 +236,9 @@ class RedactionTestCase(unittest.HomeserverTestCase):
self._event_id = event_id
@defer.inlineCallbacks
- def build(self, prev_event_ids):
+ def build(self, prev_event_ids, auth_event_ids):
built_event = yield defer.ensureDeferred(
- self._base_builder.build(prev_event_ids)
+ self._base_builder.build(prev_event_ids, auth_event_ids)
)
built_event._event_id = self._event_id
diff --git a/tests/unittest.py b/tests/unittest.py
index 6c1661c92c..040b126a27 100644
--- a/tests/unittest.py
+++ b/tests/unittest.py
@@ -20,7 +20,7 @@ import hmac
import inspect
import logging
import time
-from typing import Optional, Tuple, Type, TypeVar, Union
+from typing import Optional, Tuple, Type, TypeVar, Union, overload
from mock import Mock, patch
@@ -364,6 +364,36 @@ class HomeserverTestCase(TestCase):
Function to optionally be overridden in subclasses.
"""
+ # Annoyingly mypy doesn't seem to pick up the fact that T is SynapseRequest
+ # when the `request` arg isn't given, so we define an explicit override to
+ # cover that case.
+ @overload
+ def make_request(
+ self,
+ method: Union[bytes, str],
+ path: Union[bytes, str],
+ content: Union[bytes, dict] = b"",
+ access_token: Optional[str] = None,
+ shorthand: bool = True,
+ federation_auth_origin: str = None,
+ content_is_form: bool = False,
+ ) -> Tuple[SynapseRequest, FakeChannel]:
+ ...
+
+ @overload
+ def make_request(
+ self,
+ method: Union[bytes, str],
+ path: Union[bytes, str],
+ content: Union[bytes, dict] = b"",
+ access_token: Optional[str] = None,
+ request: Type[T] = SynapseRequest,
+ shorthand: bool = True,
+ federation_auth_origin: str = None,
+ content_is_form: bool = False,
+ ) -> Tuple[T, FakeChannel]:
+ ...
+
def make_request(
self,
method: Union[bytes, str],
|