diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 9bd941b5a0..29a19b4572 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -29,6 +29,7 @@ from synapse.api.errors import (
SynapseError,
)
from synapse.logging.opentracing import log_kv, set_tag, trace
+from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import RoomStreamToken, get_domain_from_id
from synapse.util import stringutils
from synapse.util.async_helpers import Linearizer
@@ -535,6 +536,15 @@ class DeviceListUpdater(object):
iterable=True,
)
+ # Attempt to resync out of sync device lists every 30s.
+ self._resync_retry_in_progress = False
+ self.clock.looping_call(
+ run_as_background_process,
+ 30 * 1000,
+ func=self._maybe_retry_device_resync,
+ desc="_maybe_retry_device_resync",
+ )
+
@trace
@defer.inlineCallbacks
def incoming_device_list_update(self, origin, edu_content):
@@ -679,11 +689,50 @@ class DeviceListUpdater(object):
return False
@defer.inlineCallbacks
- def user_device_resync(self, user_id):
+ def _maybe_retry_device_resync(self):
+ """Retry to resync device lists that are out of sync, except if another retry is
+ in progress.
+ """
+ if self._resync_retry_in_progress:
+ return
+
+ try:
+ # Prevent another call of this function to retry resyncing device lists so
+ # we don't send too many requests.
+ self._resync_retry_in_progress = True
+ # Get all of the users that need resyncing.
+ need_resync = yield self.store.get_user_ids_requiring_device_list_resync()
+ # Iterate over the set of user IDs.
+ for user_id in need_resync:
+ # Try to resync the current user's devices list. Exception handling
+ # isn't necessary here, since user_device_resync catches all instances
+ # of "Exception" that might be raised from the federation request. This
+ # means that if an exception is raised by this function, it must be
+ # because of a database issue, which means _maybe_retry_device_resync
+ # probably won't be able to go much further anyway.
+ result = yield self.user_device_resync(
+ user_id=user_id, mark_failed_as_stale=False,
+ )
+ # user_device_resync only returns a result if it managed to successfully
+ # resync and update the database. Updating the table of users requiring
+ # resync isn't necessary here as user_device_resync already does it
+ # (through self.store.update_remote_device_list_cache).
+ if result:
+ logger.debug(
+ "Successfully resynced the device list for %s" % user_id,
+ )
+ finally:
+ # Allow future calls to retry resyncinc out of sync device lists.
+ self._resync_retry_in_progress = False
+
+ @defer.inlineCallbacks
+ def user_device_resync(self, user_id, mark_failed_as_stale=True):
"""Fetches all devices for a user and updates the device cache with them.
Args:
user_id (str): The user's id whose device_list will be updated.
+ mark_failed_as_stale (bool): Whether to mark the user's device list as stale
+ if the attempt to resync failed.
Returns:
Deferred[dict]: a dict with device info as under the "devices" in the result of this
request:
@@ -694,10 +743,23 @@ class DeviceListUpdater(object):
origin = get_domain_from_id(user_id)
try:
result = yield self.federation.query_user_devices(origin, user_id)
- except (NotRetryingDestination, RequestSendFailed, HttpResponseException):
- # TODO: Remember that we are now out of sync and try again
- # later
- logger.warning("Failed to handle device list update for %s", user_id)
+ except NotRetryingDestination:
+ if mark_failed_as_stale:
+ # Mark the remote user's device list as stale so we know we need to retry
+ # it later.
+ yield self.store.mark_remote_user_device_cache_as_stale(user_id)
+
+ return
+ except (RequestSendFailed, HttpResponseException) as e:
+ logger.warning(
+ "Failed to handle device list update for %s: %s", user_id, e,
+ )
+
+ if mark_failed_as_stale:
+ # Mark the remote user's device list as stale so we know we need to retry
+ # it later.
+ yield self.store.mark_remote_user_device_cache_as_stale(user_id)
+
# We abort on exceptions rather than accepting the update
# as otherwise synapse will 'forget' that its device list
# is out of date. If we bail then we will retry the resync
@@ -711,13 +773,17 @@ class DeviceListUpdater(object):
logger.info(e)
return
except Exception as e:
- # TODO: Remember that we are now out of sync and try again
- # later
set_tag("error", True)
log_kv(
{"message": "Exception raised by federation request", "exception": e}
)
logger.exception("Failed to handle device list update for %s", user_id)
+
+ if mark_failed_as_stale:
+ # Mark the remote user's device list as stale so we know we need to retry
+ # it later.
+ yield self.store.mark_remote_user_device_cache_as_stale(user_id)
+
return
log_kv({"result": result})
stream_id = result["stream_id"]
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 81d859f807..75ec90d267 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -40,6 +40,7 @@ from synapse.api.errors import (
Codes,
FederationDeniedError,
FederationError,
+ HttpResponseException,
RequestSendFailed,
SynapseError,
)
@@ -125,10 +126,10 @@ class FederationHandler(BaseHandler):
self._server_notices_mxid = hs.config.server_notices_mxid
self.config = hs.config
self.http_client = hs.get_simple_http_client()
+ self._instance_name = hs.get_instance_name()
+ self._replication = hs.get_replication_data_handler()
- self._send_events_to_master = ReplicationFederationSendEventsRestServlet.make_client(
- hs
- )
+ self._send_events = ReplicationFederationSendEventsRestServlet.make_client(hs)
self._notify_user_membership_change = ReplicationUserJoinedLeftRoomRestServlet.make_client(
hs
)
@@ -1038,6 +1039,12 @@ class FederationHandler(BaseHandler):
except SynapseError as e:
logger.info("Failed to backfill from %s because %s", dom, e)
continue
+ except HttpResponseException as e:
+ if 400 <= e.code < 500:
+ raise e.to_synapse_error()
+
+ logger.info("Failed to backfill from %s because %s", dom, e)
+ continue
except CodeMessageException as e:
if 400 <= e.code < 500:
raise
@@ -1214,7 +1221,7 @@ class FederationHandler(BaseHandler):
async def do_invite_join(
self, target_hosts: Iterable[str], room_id: str, joinee: str, content: JsonDict
- ) -> None:
+ ) -> Tuple[str, int]:
""" Attempts to join the `joinee` to the room `room_id` via the
servers contained in `target_hosts`.
@@ -1235,6 +1242,10 @@ class FederationHandler(BaseHandler):
content: The event content to use for the join event.
"""
+ # TODO: We should be able to call this on workers, but the upgrading of
+ # room stuff after join currently doesn't work on workers.
+ assert self.config.worker.worker_app is None
+
logger.debug("Joining %s to %s", joinee, room_id)
origin, event, room_version_obj = await self._make_and_verify_event(
@@ -1297,15 +1308,23 @@ class FederationHandler(BaseHandler):
room_id=room_id, room_version=room_version_obj,
)
- await self._persist_auth_tree(
+ max_stream_id = await self._persist_auth_tree(
origin, auth_chain, state, event, room_version_obj
)
+ # We wait here until this instance has seen the events come down
+ # replication (if we're using replication) as the below uses caches.
+ #
+ # TODO: Currently the events stream is written to from master
+ await self._replication.wait_for_stream_position(
+ self.config.worker.writers.events, "events", max_stream_id
+ )
+
# Check whether this room is the result of an upgrade of a room we already know
# about. If so, migrate over user information
predecessor = await self.store.get_room_predecessor(room_id)
if not predecessor or not isinstance(predecessor.get("room_id"), str):
- return
+ return event.event_id, max_stream_id
old_room_id = predecessor["room_id"]
logger.debug(
"Found predecessor for %s during remote join: %s", room_id, old_room_id
@@ -1318,6 +1337,7 @@ class FederationHandler(BaseHandler):
)
logger.debug("Finished joining %s to %s", joinee, room_id)
+ return event.event_id, max_stream_id
finally:
room_queue = self.room_queues[room_id]
del self.room_queues[room_id]
@@ -1547,7 +1567,7 @@ class FederationHandler(BaseHandler):
async def do_remotely_reject_invite(
self, target_hosts: Iterable[str], room_id: str, user_id: str, content: JsonDict
- ) -> EventBase:
+ ) -> Tuple[EventBase, int]:
origin, event, room_version = await self._make_and_verify_event(
target_hosts, room_id, user_id, "leave", content=content
)
@@ -1567,9 +1587,9 @@ class FederationHandler(BaseHandler):
await self.federation_client.send_leave(target_hosts, event)
context = await self.state_handler.compute_event_context(event)
- await self.persist_events_and_notify([(event, context)])
+ stream_id = await self.persist_events_and_notify([(event, context)])
- return event
+ return event, stream_id
async def _make_and_verify_event(
self,
@@ -1881,7 +1901,7 @@ class FederationHandler(BaseHandler):
state: List[EventBase],
event: EventBase,
room_version: RoomVersion,
- ) -> None:
+ ) -> int:
"""Checks the auth chain is valid (and passes auth checks) for the
state and event. Then persists the auth chain and state atomically.
Persists the event separately. Notifies about the persisted events
@@ -1975,7 +1995,7 @@ class FederationHandler(BaseHandler):
event, old_state=state
)
- await self.persist_events_and_notify([(event, new_event_context)])
+ return await self.persist_events_and_notify([(event, new_event_context)])
async def _prep_event(
self,
@@ -2828,7 +2848,7 @@ class FederationHandler(BaseHandler):
self,
event_and_contexts: Sequence[Tuple[EventBase, EventContext]],
backfilled: bool = False,
- ) -> None:
+ ) -> int:
"""Persists events and tells the notifier/pushers about them, if
necessary.
@@ -2837,12 +2857,14 @@ class FederationHandler(BaseHandler):
backfilled: Whether these events are a result of
backfilling or not
"""
- if self.config.worker_app:
- await self._send_events_to_master(
+ if self.config.worker.writers.events != self._instance_name:
+ result = await self._send_events(
+ instance_name=self.config.worker.writers.events,
store=self.store,
event_and_contexts=event_and_contexts,
backfilled=backfilled,
)
+ return result["max_stream_id"]
else:
max_stream_id = await self.storage.persistence.persist_events(
event_and_contexts, backfilled=backfilled
@@ -2857,6 +2879,8 @@ class FederationHandler(BaseHandler):
for event, _ in event_and_contexts:
await self._notify_persisted_event(event, max_stream_id)
+ return max_stream_id
+
async def _notify_persisted_event(
self, event: EventBase, max_stream_id: int
) -> None:
diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py
index 0f0e632b62..9ed0d23b0f 100644
--- a/synapse/handlers/identity.py
+++ b/synapse/handlers/identity.py
@@ -290,8 +290,7 @@ class IdentityHandler(BaseHandler):
return changed
- @defer.inlineCallbacks
- def send_threepid_validation(
+ async def send_threepid_validation(
self,
email_address,
client_secret,
@@ -319,7 +318,7 @@ class IdentityHandler(BaseHandler):
"""
# Check that this email/client_secret/send_attempt combo is new or
# greater than what we've seen previously
- session = yield self.store.get_threepid_validation_session(
+ session = await self.store.get_threepid_validation_session(
"email", client_secret, address=email_address, validated=False
)
@@ -353,7 +352,7 @@ class IdentityHandler(BaseHandler):
# Send the mail with the link containing the token, client_secret
# and session_id
try:
- yield send_email_func(email_address, token, client_secret, session_id)
+ await send_email_func(email_address, token, client_secret, session_id)
except Exception:
logger.exception(
"Error sending threepid validation email to %s", email_address
@@ -364,7 +363,7 @@ class IdentityHandler(BaseHandler):
self.hs.clock.time_msec() + self.hs.config.email_validation_token_lifetime
)
- yield self.store.start_or_continue_validation_session(
+ await self.store.start_or_continue_validation_session(
"email",
email_address,
session_id,
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index a83c03da9a..84941eef45 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -15,7 +15,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
-from typing import Optional
+from typing import Optional, Tuple
from six import iteritems, itervalues, string_types
@@ -42,6 +42,7 @@ from synapse.api.errors import (
)
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersions
from synapse.api.urls import ConsentURIBuilder
+from synapse.events import EventBase
from synapse.events.validator import EventValidator
from synapse.logging.context import run_in_background
from synapse.metrics.background_process_metrics import run_as_background_process
@@ -365,10 +366,13 @@ class EventCreationHandler(object):
self.notifier = hs.get_notifier()
self.config = hs.config
self.require_membership_for_aliases = hs.config.require_membership_for_aliases
+ self._is_event_writer = (
+ self.config.worker.writers.events == hs.get_instance_name()
+ )
self.room_invite_state_types = self.hs.config.room_invite_state_types
- self.send_event_to_master = ReplicationSendEventRestServlet.make_client(hs)
+ self.send_event = ReplicationSendEventRestServlet.make_client(hs)
# This is only used to get at ratelimit function, and maybe_kick_guest_users
self.base_handler = BaseHandler(hs)
@@ -632,7 +636,9 @@ class EventCreationHandler(object):
msg = self._block_events_without_consent_error % {"consent_uri": consent_uri}
raise ConsentNotGivenError(msg=msg, consent_uri=consent_uri)
- async def send_nonmember_event(self, requester, event, context, ratelimit=True):
+ async def send_nonmember_event(
+ self, requester, event, context, ratelimit=True
+ ) -> int:
"""
Persists and notifies local clients and federation of an event.
@@ -641,6 +647,9 @@ class EventCreationHandler(object):
context (Context) the context of the event.
ratelimit (bool): Whether to rate limit this send.
is_guest (bool): Whether the sender is a guest.
+
+ Return:
+ The stream_id of the persisted event.
"""
if event.type == EventTypes.Member:
raise SynapseError(
@@ -661,7 +670,7 @@ class EventCreationHandler(object):
)
return prev_state
- await self.handle_new_client_event(
+ return await self.handle_new_client_event(
requester=requester, event=event, context=context, ratelimit=ratelimit
)
@@ -690,7 +699,7 @@ class EventCreationHandler(object):
async def create_and_send_nonmember_event(
self, requester, event_dict, ratelimit=True, txn_id=None
- ):
+ ) -> Tuple[EventBase, int]:
"""
Creates an event, then sends it.
@@ -713,10 +722,10 @@ class EventCreationHandler(object):
spam_error = "Spam is not permitted here"
raise SynapseError(403, spam_error, Codes.FORBIDDEN)
- await self.send_nonmember_event(
+ stream_id = await self.send_nonmember_event(
requester, event, context, ratelimit=ratelimit
)
- return event
+ return event, stream_id
@measure_func("create_new_client_event")
@defer.inlineCallbacks
@@ -776,7 +785,7 @@ class EventCreationHandler(object):
@measure_func("handle_new_client_event")
async def handle_new_client_event(
self, requester, event, context, ratelimit=True, extra_users=[]
- ):
+ ) -> int:
"""Processes a new event. This includes checking auth, persisting it,
notifying users, sending to remote servers, etc.
@@ -789,6 +798,9 @@ class EventCreationHandler(object):
context (EventContext)
ratelimit (bool)
extra_users (list(UserID)): Any extra users to notify about event
+
+ Return:
+ The stream_id of the persisted event.
"""
if event.is_state() and (event.type, event.state_key) == (
@@ -828,8 +840,9 @@ class EventCreationHandler(object):
success = False
try:
# If we're a worker we need to hit out to the master.
- if self._is_worker_app:
- await self.send_event_to_master(
+ if not self._is_event_writer:
+ result = await self.send_event(
+ instance_name=self.config.worker.writers.events,
event_id=event.event_id,
store=self.store,
requester=requester,
@@ -838,14 +851,17 @@ class EventCreationHandler(object):
ratelimit=ratelimit,
extra_users=extra_users,
)
+ stream_id = result["stream_id"]
+ event.internal_metadata.stream_ordering = stream_id
success = True
- return
+ return stream_id
- await self.persist_and_notify_client_event(
+ stream_id = await self.persist_and_notify_client_event(
requester, event, context, ratelimit=ratelimit, extra_users=extra_users
)
success = True
+ return stream_id
finally:
if not success:
# Ensure that we actually remove the entries in the push actions
@@ -888,13 +904,13 @@ class EventCreationHandler(object):
async def persist_and_notify_client_event(
self, requester, event, context, ratelimit=True, extra_users=[]
- ):
+ ) -> int:
"""Called when we have fully built the event, have already
calculated the push actions for the event, and checked auth.
- This should only be run on master.
+ This should only be run on the instance in charge of persisting events.
"""
- assert not self._is_worker_app
+ assert self._is_event_writer
if ratelimit:
# We check if this is a room admin redacting an event so that we
@@ -1078,6 +1094,8 @@ class EventCreationHandler(object):
# matters as sometimes presence code can take a while.
run_in_background(self._bump_active_time, requester.user)
+ return event_stream_id
+
async def _bump_active_time(self, user):
try:
presence = self.hs.get_presence_handler()
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 9ea11c0754..3594f3b00f 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -193,6 +193,12 @@ class BasePresenceHandler(abc.ABC):
) -> None:
"""Set the presence state of the user. """
+ @abc.abstractmethod
+ async def bump_presence_active_time(self, user: UserID):
+ """We've seen the user do something that indicates they're interacting
+ with the app.
+ """
+
class PresenceHandler(BasePresenceHandler):
def __init__(self, hs: "synapse.server.HomeServer"):
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 73f9eeb399..61db3ccc43 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -22,6 +22,7 @@ import logging
import math
import string
from collections import OrderedDict
+from typing import Tuple
from six import iteritems, string_types
@@ -88,6 +89,8 @@ class RoomCreationHandler(BaseHandler):
self.room_member_handler = hs.get_room_member_handler()
self.config = hs.config
+ self._replication = hs.get_replication_data_handler()
+
# linearizer to stop two upgrades happening at once
self._upgrade_linearizer = Linearizer("room_upgrade_linearizer")
@@ -439,73 +442,78 @@ class RoomCreationHandler(BaseHandler):
new_room_id: str,
old_room_state: StateMap[str],
):
- directory_handler = self.hs.get_handlers().directory_handler
-
- aliases = await self.store.get_aliases_for_room(old_room_id)
-
# check to see if we have a canonical alias.
canonical_alias_event = None
canonical_alias_event_id = old_room_state.get((EventTypes.CanonicalAlias, ""))
if canonical_alias_event_id:
canonical_alias_event = await self.store.get_event(canonical_alias_event_id)
- # first we try to remove the aliases from the old room (we suppress sending
- # the room_aliases event until the end).
- #
- # Note that we'll only be able to remove aliases that (a) aren't owned by an AS,
- # and (b) unless the user is a server admin, which the user created.
- #
- # This is probably correct - given we don't allow such aliases to be deleted
- # normally, it would be odd to allow it in the case of doing a room upgrade -
- # but it makes the upgrade less effective, and you have to wonder why a room
- # admin can't remove aliases that point to that room anyway.
- # (cf https://github.com/matrix-org/synapse/issues/2360)
- #
- removed_aliases = []
- for alias_str in aliases:
- alias = RoomAlias.from_string(alias_str)
- try:
- await directory_handler.delete_association(requester, alias)
- removed_aliases.append(alias_str)
- except SynapseError as e:
- logger.warning("Unable to remove alias %s from old room: %s", alias, e)
-
- # if we didn't find any aliases, or couldn't remove anyway, we can skip the rest
- # of this.
- if not removed_aliases:
+ await self.store.update_aliases_for_room(old_room_id, new_room_id)
+
+ if not canonical_alias_event:
return
- # we can now add any aliases we successfully removed to the new room.
- for alias in removed_aliases:
- try:
- await directory_handler.create_association(
- requester,
- RoomAlias.from_string(alias),
- new_room_id,
- servers=(self.hs.hostname,),
- check_membership=False,
- )
- logger.info("Moved alias %s to new room", alias)
- except SynapseError as e:
- # I'm not really expecting this to happen, but it could if the spam
- # checking module decides it shouldn't, or similar.
- logger.error("Error adding alias %s to new room: %s", alias, e)
+ # If there is a canonical alias we need to update the one in the old
+ # room and set one in the new one.
+ old_canonical_alias_content = dict(canonical_alias_event.content)
+ new_canonical_alias_content = {}
+
+ canonical = canonical_alias_event.content.get("alias")
+ if canonical and self.hs.is_mine_id(canonical):
+ new_canonical_alias_content["alias"] = canonical
+ old_canonical_alias_content.pop("alias", None)
+
+ # We convert to a list as it will be a Tuple.
+ old_alt_aliases = list(old_canonical_alias_content.get("alt_aliases", []))
+ if old_alt_aliases:
+ old_canonical_alias_content["alt_aliases"] = old_alt_aliases
+ new_alt_aliases = new_canonical_alias_content.setdefault("alt_aliases", [])
+ for alias in canonical_alias_event.content.get("alt_aliases", []):
+ try:
+ if self.hs.is_mine_id(alias):
+ new_alt_aliases.append(alias)
+ old_alt_aliases.remove(alias)
+ except Exception:
+ logger.info(
+ "Invalid alias %s in canonical alias event %s",
+ alias,
+ canonical_alias_event_id,
+ )
+
+ if not old_alt_aliases:
+ old_canonical_alias_content.pop("alt_aliases")
# If a canonical alias event existed for the old room, fire a canonical
# alias event for the new room with a copy of the information.
try:
- if canonical_alias_event:
- await self.event_creation_handler.create_and_send_nonmember_event(
- requester,
- {
- "type": EventTypes.CanonicalAlias,
- "state_key": "",
- "room_id": new_room_id,
- "sender": requester.user.to_string(),
- "content": canonical_alias_event.content,
- },
- ratelimit=False,
- )
+ await self.event_creation_handler.create_and_send_nonmember_event(
+ requester,
+ {
+ "type": EventTypes.CanonicalAlias,
+ "state_key": "",
+ "room_id": old_room_id,
+ "sender": requester.user.to_string(),
+ "content": old_canonical_alias_content,
+ },
+ ratelimit=False,
+ )
+ except SynapseError as e:
+ # again I'm not really expecting this to fail, but if it does, I'd rather
+ # we returned the new room to the client at this point.
+ logger.error("Unable to send updated alias events in old room: %s", e)
+
+ try:
+ await self.event_creation_handler.create_and_send_nonmember_event(
+ requester,
+ {
+ "type": EventTypes.CanonicalAlias,
+ "state_key": "",
+ "room_id": new_room_id,
+ "sender": requester.user.to_string(),
+ "content": new_canonical_alias_content,
+ },
+ ratelimit=False,
+ )
except SynapseError as e:
# again I'm not really expecting this to fail, but if it does, I'd rather
# we returned the new room to the client at this point.
@@ -513,7 +521,7 @@ class RoomCreationHandler(BaseHandler):
async def create_room(
self, requester, config, ratelimit=True, creator_join_profile=None
- ):
+ ) -> Tuple[dict, int]:
""" Creates a new room.
Args:
@@ -530,9 +538,9 @@ class RoomCreationHandler(BaseHandler):
`avatar_url` and/or `displayname`.
Returns:
- Deferred[dict]:
- a dict containing the keys `room_id` and, if an alias was
- requested, `room_alias`.
+ First, a dict containing the keys `room_id` and, if an alias
+ was, requested, `room_alias`. Secondly, the stream_id of the
+ last persisted event.
Raises:
SynapseError if the room ID couldn't be stored, or something went
horribly wrong.
@@ -664,7 +672,7 @@ class RoomCreationHandler(BaseHandler):
# override any attempt to set room versions via the creation_content
creation_content["room_version"] = room_version.identifier
- await self._send_events_for_new_room(
+ last_stream_id = await self._send_events_for_new_room(
requester,
room_id,
preset_config=preset_config,
@@ -678,7 +686,10 @@ class RoomCreationHandler(BaseHandler):
if "name" in config:
name = config["name"]
- await self.event_creation_handler.create_and_send_nonmember_event(
+ (
+ _,
+ last_stream_id,
+ ) = await self.event_creation_handler.create_and_send_nonmember_event(
requester,
{
"type": EventTypes.Name,
@@ -692,7 +703,10 @@ class RoomCreationHandler(BaseHandler):
if "topic" in config:
topic = config["topic"]
- await self.event_creation_handler.create_and_send_nonmember_event(
+ (
+ _,
+ last_stream_id,
+ ) = await self.event_creation_handler.create_and_send_nonmember_event(
requester,
{
"type": EventTypes.Topic,
@@ -710,7 +724,7 @@ class RoomCreationHandler(BaseHandler):
if is_direct:
content["is_direct"] = is_direct
- await self.room_member_handler.update_membership(
+ _, last_stream_id = await self.room_member_handler.update_membership(
requester,
UserID.from_string(invitee),
room_id,
@@ -724,7 +738,7 @@ class RoomCreationHandler(BaseHandler):
id_access_token = invite_3pid.get("id_access_token") # optional
address = invite_3pid["address"]
medium = invite_3pid["medium"]
- await self.hs.get_room_member_handler().do_3pid_invite(
+ last_stream_id = await self.hs.get_room_member_handler().do_3pid_invite(
room_id,
requester.user,
medium,
@@ -740,7 +754,12 @@ class RoomCreationHandler(BaseHandler):
if room_alias:
result["room_alias"] = room_alias.to_string()
- return result
+ # Always wait for room creation to progate before returning
+ await self._replication.wait_for_stream_position(
+ self.hs.config.worker.writers.events, "events", last_stream_id
+ )
+
+ return result, last_stream_id
async def _send_events_for_new_room(
self,
@@ -753,7 +772,13 @@ class RoomCreationHandler(BaseHandler):
room_alias=None,
power_level_content_override=None, # Doesn't apply when initial state has power level state event content
creator_join_profile=None,
- ):
+ ) -> int:
+ """Sends the initial events into a new room.
+
+ Returns:
+ The stream_id of the last event persisted.
+ """
+
def create(etype, content, **kwargs):
e = {"type": etype, "content": content}
@@ -762,12 +787,16 @@ class RoomCreationHandler(BaseHandler):
return e
- async def send(etype, content, **kwargs):
+ async def send(etype, content, **kwargs) -> int:
event = create(etype, content, **kwargs)
logger.debug("Sending %s in new room", etype)
- await self.event_creation_handler.create_and_send_nonmember_event(
+ (
+ _,
+ last_stream_id,
+ ) = await self.event_creation_handler.create_and_send_nonmember_event(
creator, event, ratelimit=False
)
+ return last_stream_id
config = RoomCreationHandler.PRESETS_DICT[preset_config]
@@ -792,7 +821,9 @@ class RoomCreationHandler(BaseHandler):
# of the first events that get sent into a room.
pl_content = initial_state.pop((EventTypes.PowerLevels, ""), None)
if pl_content is not None:
- await send(etype=EventTypes.PowerLevels, content=pl_content)
+ last_sent_stream_id = await send(
+ etype=EventTypes.PowerLevels, content=pl_content
+ )
else:
power_level_content = {
"users": {creator_id: 100},
@@ -825,33 +856,39 @@ class RoomCreationHandler(BaseHandler):
if power_level_content_override:
power_level_content.update(power_level_content_override)
- await send(etype=EventTypes.PowerLevels, content=power_level_content)
+ last_sent_stream_id = await send(
+ etype=EventTypes.PowerLevels, content=power_level_content
+ )
if room_alias and (EventTypes.CanonicalAlias, "") not in initial_state:
- await send(
+ last_sent_stream_id = await send(
etype=EventTypes.CanonicalAlias,
content={"alias": room_alias.to_string()},
)
if (EventTypes.JoinRules, "") not in initial_state:
- await send(
+ last_sent_stream_id = await send(
etype=EventTypes.JoinRules, content={"join_rule": config["join_rules"]}
)
if (EventTypes.RoomHistoryVisibility, "") not in initial_state:
- await send(
+ last_sent_stream_id = await send(
etype=EventTypes.RoomHistoryVisibility,
content={"history_visibility": config["history_visibility"]},
)
if config["guest_can_join"]:
if (EventTypes.GuestAccess, "") not in initial_state:
- await send(
+ last_sent_stream_id = await send(
etype=EventTypes.GuestAccess, content={"guest_access": "can_join"}
)
for (etype, state_key), content in initial_state.items():
- await send(etype=etype, state_key=state_key, content=content)
+ last_sent_stream_id = await send(
+ etype=etype, state_key=state_key, content=content
+ )
+
+ return last_sent_stream_id
async def _generate_room_id(
self, creator_id: str, is_public: str, room_version: RoomVersion,
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 619252b761..cfd6efd1fc 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -17,7 +17,7 @@
import abc
import logging
-from typing import Dict, Iterable, List, Optional, Tuple, Union
+from typing import Dict, Iterable, List, Optional, Tuple
from six.moves import http_client
@@ -26,6 +26,9 @@ from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError, Codes, SynapseError
from synapse.events import EventBase
from synapse.events.snapshot import EventContext
+from synapse.replication.http.membership import (
+ ReplicationLocallyRejectInviteRestServlet,
+)
from synapse.types import Collection, Requester, RoomAlias, RoomID, UserID
from synapse.util.async_helpers import Linearizer
from synapse.util.distributor import user_joined_room, user_left_room
@@ -44,11 +47,6 @@ class RoomMemberHandler(object):
__metaclass__ = abc.ABCMeta
def __init__(self, hs):
- """
-
- Args:
- hs (synapse.server.HomeServer):
- """
self.hs = hs
self.store = hs.get_datastore()
self.auth = hs.get_auth()
@@ -72,6 +70,17 @@ class RoomMemberHandler(object):
self._enable_lookup = hs.config.enable_3pid_lookup
self.allow_per_room_profiles = self.config.allow_per_room_profiles
+ self._event_stream_writer_instance = hs.config.worker.writers.events
+ self._is_on_event_persistence_instance = (
+ self._event_stream_writer_instance == hs.get_instance_name()
+ )
+ if self._is_on_event_persistence_instance:
+ self.persist_event_storage = hs.get_storage().persistence
+ else:
+ self._locally_reject_client = ReplicationLocallyRejectInviteRestServlet.make_client(
+ hs
+ )
+
# This is only used to get at ratelimit function, and
# maybe_kick_guest_users. It's fine there are multiple of these as
# it doesn't store state.
@@ -85,7 +94,7 @@ class RoomMemberHandler(object):
room_id: str,
user: UserID,
content: dict,
- ) -> Optional[dict]:
+ ) -> Tuple[str, int]:
"""Try and join a room that this server is not in
Args:
@@ -105,7 +114,7 @@ class RoomMemberHandler(object):
room_id: str,
target: UserID,
content: dict,
- ) -> dict:
+ ) -> Tuple[Optional[str], int]:
"""Attempt to reject an invite for a room this server is not in. If we
fail to do so we locally mark the invite as rejected.
@@ -122,6 +131,22 @@ class RoomMemberHandler(object):
"""
raise NotImplementedError()
+ async def locally_reject_invite(self, user_id: str, room_id: str) -> int:
+ """Mark the invite has having been rejected even though we failed to
+ create a leave event for it.
+ """
+ if self._is_on_event_persistence_instance:
+ return await self.persist_event_storage.locally_reject_invite(
+ user_id, room_id
+ )
+ else:
+ result = await self._locally_reject_client(
+ instance_name=self._event_stream_writer_instance,
+ user_id=user_id,
+ room_id=room_id,
+ )
+ return result["stream_id"]
+
@abc.abstractmethod
async def _user_joined_room(self, target: UserID, room_id: str) -> None:
"""Notifies distributor on master process that the user has joined the
@@ -155,7 +180,7 @@ class RoomMemberHandler(object):
ratelimit: bool = True,
content: Optional[dict] = None,
require_consent: bool = True,
- ) -> EventBase:
+ ) -> Tuple[str, int]:
user_id = target.to_string()
if content is None:
@@ -188,9 +213,10 @@ class RoomMemberHandler(object):
)
if duplicate is not None:
# Discard the new event since this membership change is a no-op.
- return duplicate
+ _, stream_id = await self.store.get_event_ordering(duplicate.event_id)
+ return duplicate.event_id, stream_id
- await self.event_creation_handler.handle_new_client_event(
+ stream_id = await self.event_creation_handler.handle_new_client_event(
requester, event, context, extra_users=[target], ratelimit=ratelimit
)
@@ -214,7 +240,7 @@ class RoomMemberHandler(object):
if prev_member_event.membership == Membership.JOIN:
await self._user_left_room(target, room_id)
- return event
+ return event.event_id, stream_id
async def copy_room_tags_and_direct_to_room(
self, old_room_id, new_room_id, user_id
@@ -264,7 +290,7 @@ class RoomMemberHandler(object):
ratelimit: bool = True,
content: Optional[dict] = None,
require_consent: bool = True,
- ) -> Union[EventBase, Optional[dict]]:
+ ) -> Tuple[Optional[str], int]:
key = (room_id,)
as_id = object()
@@ -314,7 +340,7 @@ class RoomMemberHandler(object):
ratelimit: bool = True,
content: Optional[dict] = None,
require_consent: bool = True,
- ) -> Union[EventBase, Optional[dict]]:
+ ) -> Tuple[Optional[str], int]:
content_specified = bool(content)
if content is None:
content = {}
@@ -418,7 +444,13 @@ class RoomMemberHandler(object):
same_membership = old_membership == effective_membership_state
same_sender = requester.user.to_string() == old_state.sender
if same_sender and same_membership and same_content:
- return old_state
+ _, stream_id = await self.store.get_event_ordering(
+ old_state.event_id
+ )
+ return (
+ old_state.event_id,
+ stream_id,
+ )
if old_membership in ["ban", "leave"] and action == "kick":
raise AuthError(403, "The target user is not in the room")
@@ -725,7 +757,7 @@ class RoomMemberHandler(object):
requester: Requester,
txn_id: Optional[str],
id_access_token: Optional[str] = None,
- ) -> None:
+ ) -> int:
if self.config.block_non_admin_invites:
is_requester_admin = await self.auth.is_server_admin(requester.user)
if not is_requester_admin:
@@ -757,11 +789,11 @@ class RoomMemberHandler(object):
)
if invitee:
- await self.update_membership(
+ _, stream_id = await self.update_membership(
requester, UserID.from_string(invitee), room_id, "invite", txn_id=txn_id
)
else:
- await self._make_and_store_3pid_invite(
+ stream_id = await self._make_and_store_3pid_invite(
requester,
id_server,
medium,
@@ -772,6 +804,8 @@ class RoomMemberHandler(object):
id_access_token=id_access_token,
)
+ return stream_id
+
async def _make_and_store_3pid_invite(
self,
requester: Requester,
@@ -782,7 +816,7 @@ class RoomMemberHandler(object):
user: UserID,
txn_id: Optional[str],
id_access_token: Optional[str] = None,
- ) -> None:
+ ) -> int:
room_state = await self.state_handler.get_current_state(room_id)
inviter_display_name = ""
@@ -837,7 +871,10 @@ class RoomMemberHandler(object):
id_access_token=id_access_token,
)
- await self.event_creation_handler.create_and_send_nonmember_event(
+ (
+ event,
+ stream_id,
+ ) = await self.event_creation_handler.create_and_send_nonmember_event(
requester,
{
"type": EventTypes.ThirdPartyInvite,
@@ -855,6 +892,7 @@ class RoomMemberHandler(object):
ratelimit=False,
txn_id=txn_id,
)
+ return stream_id
async def _is_host_in_room(
self, current_state_ids: Dict[Tuple[str, str], str]
@@ -936,7 +974,7 @@ class RoomMemberMasterHandler(RoomMemberHandler):
room_id: str,
user: UserID,
content: dict,
- ) -> None:
+ ) -> Tuple[str, int]:
"""Implements RoomMemberHandler._remote_join
"""
# filter ourselves out of remote_room_hosts: do_invite_join ignores it
@@ -965,7 +1003,7 @@ class RoomMemberMasterHandler(RoomMemberHandler):
# join dance for now, since we're kinda implicitly checking
# that we are allowed to join when we decide whether or not we
# need to do the invite/join dance.
- await self.federation_handler.do_invite_join(
+ event_id, stream_id = await self.federation_handler.do_invite_join(
remote_room_hosts, room_id, user.to_string(), content
)
await self._user_joined_room(user, room_id)
@@ -975,14 +1013,14 @@ class RoomMemberMasterHandler(RoomMemberHandler):
if self.hs.config.limit_remote_rooms.enabled:
if too_complex is False:
# We checked, and we're under the limit.
- return
+ return event_id, stream_id
# Check again, but with the local state events
too_complex = await self._is_local_room_too_complex(room_id)
if too_complex is False:
# We're under the limit.
- return
+ return event_id, stream_id
# The room is too large. Leave.
requester = types.create_requester(user, None, False, None)
@@ -995,6 +1033,8 @@ class RoomMemberMasterHandler(RoomMemberHandler):
errcode=Codes.RESOURCE_LIMIT_EXCEEDED,
)
+ return event_id, stream_id
+
async def _remote_reject_invite(
self,
requester: Requester,
@@ -1002,15 +1042,15 @@ class RoomMemberMasterHandler(RoomMemberHandler):
room_id: str,
target: UserID,
content: dict,
- ) -> dict:
+ ) -> Tuple[Optional[str], int]:
"""Implements RoomMemberHandler._remote_reject_invite
"""
fed_handler = self.federation_handler
try:
- ret = await fed_handler.do_remotely_reject_invite(
+ event, stream_id = await fed_handler.do_remotely_reject_invite(
remote_room_hosts, room_id, target.to_string(), content=content,
)
- return ret
+ return event.event_id, stream_id
except Exception as e:
# if we were unable to reject the exception, just mark
# it as rejected on our end and plough ahead.
@@ -1020,8 +1060,8 @@ class RoomMemberMasterHandler(RoomMemberHandler):
#
logger.warning("Failed to reject invite: %s", e)
- await self.store.locally_reject_invite(target.to_string(), room_id)
- return {}
+ stream_id = await self.locally_reject_invite(target.to_string(), room_id)
+ return None, stream_id
async def _user_joined_room(self, target: UserID, room_id: str) -> None:
"""Implements RoomMemberHandler._user_joined_room
diff --git a/synapse/handlers/room_member_worker.py b/synapse/handlers/room_member_worker.py
index 5c776cc0be..02e0c4103d 100644
--- a/synapse/handlers/room_member_worker.py
+++ b/synapse/handlers/room_member_worker.py
@@ -14,7 +14,7 @@
# limitations under the License.
import logging
-from typing import List, Optional
+from typing import List, Optional, Tuple
from synapse.api.errors import SynapseError
from synapse.handlers.room_member import RoomMemberHandler
@@ -43,7 +43,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
room_id: str,
user: UserID,
content: dict,
- ) -> Optional[dict]:
+ ) -> Tuple[str, int]:
"""Implements RoomMemberHandler._remote_join
"""
if len(remote_room_hosts) == 0:
@@ -59,7 +59,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
await self._user_joined_room(user, room_id)
- return ret
+ return ret["event_id"], ret["stream_id"]
async def _remote_reject_invite(
self,
@@ -68,16 +68,17 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
room_id: str,
target: UserID,
content: dict,
- ) -> dict:
+ ) -> Tuple[Optional[str], int]:
"""Implements RoomMemberHandler._remote_reject_invite
"""
- return await self._remote_reject_client(
+ ret = await self._remote_reject_client(
requester=requester,
remote_room_hosts=remote_room_hosts,
room_id=room_id,
user_id=target.to_string(),
content=content,
)
+ return ret["event_id"], ret["stream_id"]
async def _user_joined_room(self, target: UserID, room_id: str) -> None:
"""Implements RoomMemberHandler._user_joined_room
|