diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 0e83453851..40f3d24678 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -37,14 +37,15 @@ class DeviceHandler(BaseHandler):
self.state = hs.get_state_handler()
self._auth_handler = hs.get_auth_handler()
self.federation_sender = hs.get_federation_sender()
- self.federation = hs.get_replication_layer()
self._edu_updater = DeviceListEduUpdater(hs, self)
- self.federation.register_edu_handler(
+ federation_registry = hs.get_federation_registry()
+
+ federation_registry.register_edu_handler(
"m.device_list_update", self._edu_updater.incoming_device_list_update,
)
- self.federation.register_query_handler(
+ federation_registry.register_query_handler(
"user_devices", self.on_federation_query_user_devices,
)
@@ -430,7 +431,7 @@ class DeviceListEduUpdater(object):
def __init__(self, hs, device_handler):
self.store = hs.get_datastore()
- self.federation = hs.get_replication_layer()
+ self.federation = hs.get_federation_client()
self.clock = hs.get_clock()
self.device_handler = device_handler
diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py
index d996aa90bb..f147a20b73 100644
--- a/synapse/handlers/devicemessage.py
+++ b/synapse/handlers/devicemessage.py
@@ -37,7 +37,7 @@ class DeviceMessageHandler(object):
self.is_mine = hs.is_mine
self.federation = hs.get_federation_sender()
- hs.get_replication_layer().register_edu_handler(
+ hs.get_federation_registry().register_edu_handler(
"m.direct_to_device", self.on_direct_to_device_edu
)
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index 8580ada60a..c5b6e75e03 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -36,8 +36,8 @@ class DirectoryHandler(BaseHandler):
self.appservice_handler = hs.get_application_service_handler()
self.event_creation_handler = hs.get_event_creation_handler()
- self.federation = hs.get_replication_layer()
- self.federation.register_query_handler(
+ self.federation = hs.get_federation_client()
+ hs.get_federation_registry().register_query_handler(
"directory", self.on_directory_query
)
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index 9aa95f89e6..31b1ece13e 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -32,7 +32,7 @@ logger = logging.getLogger(__name__)
class E2eKeysHandler(object):
def __init__(self, hs):
self.store = hs.get_datastore()
- self.federation = hs.get_replication_layer()
+ self.federation = hs.get_federation_client()
self.device_handler = hs.get_device_handler()
self.is_mine = hs.is_mine
self.clock = hs.get_clock()
@@ -40,7 +40,7 @@ class E2eKeysHandler(object):
# doesn't really work as part of the generic query API, because the
# query request requires an object POST, but we abuse the
# "query handler" interface.
- self.federation.register_query_handler(
+ hs.get_federation_registry().register_query_handler(
"client_keys", self.on_federation_query_client_keys
)
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 520612683e..080aca3d71 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -68,7 +68,7 @@ class FederationHandler(BaseHandler):
self.hs = hs
self.store = hs.get_datastore()
- self.replication_layer = hs.get_replication_layer()
+ self.replication_layer = hs.get_federation_client()
self.state_handler = hs.get_state_handler()
self.server_name = hs.hostname
self.keyring = hs.get_keyring()
@@ -78,8 +78,6 @@ class FederationHandler(BaseHandler):
self.spam_checker = hs.get_spam_checker()
self.event_creation_handler = hs.get_event_creation_handler()
- self.replication_layer.set_handler(self)
-
# When joining a room we need to queue any events for that room up
self.room_queues = {}
self._room_pdu_linearizer = Linearizer("fed_room_pdu")
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index dd00d8a86c..4f97c8db79 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -13,7 +13,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-from twisted.internet import defer
+from twisted.internet import defer, reactor
+from twisted.python.failure import Failure
from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError, Codes, SynapseError
@@ -24,9 +25,10 @@ from synapse.types import (
UserID, RoomAlias, RoomStreamToken,
)
from synapse.util.async import run_on_reactor, ReadWriteLock, Limiter
-from synapse.util.logcontext import preserve_fn
+from synapse.util.logcontext import preserve_fn, run_in_background
from synapse.util.metrics import measure_func
from synapse.util.frozenutils import unfreeze
+from synapse.util.stringutils import random_string
from synapse.visibility import filter_events_for_client
from synapse.replication.http.send_event import send_event_to_master
@@ -41,6 +43,36 @@ import ujson
logger = logging.getLogger(__name__)
+class PurgeStatus(object):
+ """Object tracking the status of a purge request
+
+ This class contains information on the progress of a purge request, for
+ return by get_purge_status.
+
+ Attributes:
+ status (int): Tracks whether this request has completed. One of
+ STATUS_{ACTIVE,COMPLETE,FAILED}
+ """
+
+ STATUS_ACTIVE = 0
+ STATUS_COMPLETE = 1
+ STATUS_FAILED = 2
+
+ STATUS_TEXT = {
+ STATUS_ACTIVE: "active",
+ STATUS_COMPLETE: "complete",
+ STATUS_FAILED: "failed",
+ }
+
+ def __init__(self):
+ self.status = PurgeStatus.STATUS_ACTIVE
+
+ def asdict(self):
+ return {
+ "status": PurgeStatus.STATUS_TEXT[self.status]
+ }
+
+
class MessageHandler(BaseHandler):
def __init__(self, hs):
@@ -50,15 +82,88 @@ class MessageHandler(BaseHandler):
self.clock = hs.get_clock()
self.pagination_lock = ReadWriteLock()
+ self._purges_in_progress_by_room = set()
+ # map from purge id to PurgeStatus
+ self._purges_by_id = {}
- @defer.inlineCallbacks
- def purge_history(self, room_id, topological_ordering,
- delete_local_events=False):
- with (yield self.pagination_lock.write(room_id)):
- yield self.store.purge_history(
- room_id, topological_ordering, delete_local_events,
+ def start_purge_history(self, room_id, topological_ordering,
+ delete_local_events=False):
+ """Start off a history purge on a room.
+
+ Args:
+ room_id (str): The room to purge from
+
+ topological_ordering (int): minimum topo ordering to preserve
+ delete_local_events (bool): True to delete local events as well as
+ remote ones
+
+ Returns:
+ str: unique ID for this purge transaction.
+ """
+ if room_id in self._purges_in_progress_by_room:
+ raise SynapseError(
+ 400,
+ "History purge already in progress for %s" % (room_id, ),
)
+ purge_id = random_string(16)
+
+ # we log the purge_id here so that it can be tied back to the
+ # request id in the log lines.
+ logger.info("[purge] starting purge_id %s", purge_id)
+
+ self._purges_by_id[purge_id] = PurgeStatus()
+ run_in_background(
+ self._purge_history,
+ purge_id, room_id, topological_ordering, delete_local_events,
+ )
+ return purge_id
+
+ @defer.inlineCallbacks
+ def _purge_history(self, purge_id, room_id, topological_ordering,
+ delete_local_events):
+ """Carry out a history purge on a room.
+
+ Args:
+ purge_id (str): The id for this purge
+ room_id (str): The room to purge from
+ topological_ordering (int): minimum topo ordering to preserve
+ delete_local_events (bool): True to delete local events as well as
+ remote ones
+
+ Returns:
+ Deferred
+ """
+ self._purges_in_progress_by_room.add(room_id)
+ try:
+ with (yield self.pagination_lock.write(room_id)):
+ yield self.store.purge_history(
+ room_id, topological_ordering, delete_local_events,
+ )
+ logger.info("[purge] complete")
+ self._purges_by_id[purge_id].status = PurgeStatus.STATUS_COMPLETE
+ except Exception:
+ logger.error("[purge] failed: %s", Failure().getTraceback().rstrip())
+ self._purges_by_id[purge_id].status = PurgeStatus.STATUS_FAILED
+ finally:
+ self._purges_in_progress_by_room.discard(room_id)
+
+ # remove the purge from the list 24 hours after it completes
+ def clear_purge():
+ del self._purges_by_id[purge_id]
+ reactor.callLater(24 * 3600, clear_purge)
+
+ def get_purge_status(self, purge_id):
+ """Get the current status of an active purge
+
+ Args:
+ purge_id (str): purge_id returned by start_purge_history
+
+ Returns:
+ PurgeStatus|None
+ """
+ return self._purges_by_id.get(purge_id)
+
@defer.inlineCallbacks
def get_messages(self, requester, room_id=None, pagin_config=None,
as_client_event=True, event_filter=None):
@@ -562,7 +667,7 @@ class EventCreationHandler(object):
event (FrozenEvent)
context (EventContext)
ratelimit (bool)
- extra_users (list(str)): Any extra users to notify about event
+ extra_users (list(UserID)): Any extra users to notify about event
"""
try:
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index cb158ba962..a5e501897c 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -93,29 +93,30 @@ class PresenceHandler(object):
self.store = hs.get_datastore()
self.wheel_timer = WheelTimer()
self.notifier = hs.get_notifier()
- self.replication = hs.get_replication_layer()
self.federation = hs.get_federation_sender()
self.state = hs.get_state_handler()
- self.replication.register_edu_handler(
+ federation_registry = hs.get_federation_registry()
+
+ federation_registry.register_edu_handler(
"m.presence", self.incoming_presence
)
- self.replication.register_edu_handler(
+ federation_registry.register_edu_handler(
"m.presence_invite",
lambda origin, content: self.invite_presence(
observed_user=UserID.from_string(content["observed_user"]),
observer_user=UserID.from_string(content["observer_user"]),
)
)
- self.replication.register_edu_handler(
+ federation_registry.register_edu_handler(
"m.presence_accept",
lambda origin, content: self.accept_presence(
observed_user=UserID.from_string(content["observed_user"]),
observer_user=UserID.from_string(content["observer_user"]),
)
)
- self.replication.register_edu_handler(
+ federation_registry.register_edu_handler(
"m.presence_deny",
lambda origin, content: self.deny_presence(
observed_user=UserID.from_string(content["observed_user"]),
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index c9c2879038..cb710fe796 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -31,8 +31,8 @@ class ProfileHandler(BaseHandler):
def __init__(self, hs):
super(ProfileHandler, self).__init__(hs)
- self.federation = hs.get_replication_layer()
- self.federation.register_query_handler(
+ self.federation = hs.get_federation_client()
+ hs.get_federation_registry().register_query_handler(
"profile", self.on_profile_query
)
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index 0525765272..3f215c2b4e 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -35,7 +35,7 @@ class ReceiptsHandler(BaseHandler):
self.store = hs.get_datastore()
self.hs = hs
self.federation = hs.get_federation_sender()
- hs.get_replication_layer().register_edu_handler(
+ hs.get_federation_registry().register_edu_handler(
"m.receipt", self._received_remote_receipt
)
self.clock = self.hs.get_clock()
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 9021d4d57f..ed5939880a 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -446,16 +446,34 @@ class RegistrationHandler(BaseHandler):
return self.hs.get_auth_handler()
@defer.inlineCallbacks
- def guest_access_token_for(self, medium, address, inviter_user_id):
+ def get_or_register_3pid_guest(self, medium, address, inviter_user_id):
+ """Get a guest access token for a 3PID, creating a guest account if
+ one doesn't already exist.
+
+ Args:
+ medium (str)
+ address (str)
+ inviter_user_id (str): The user ID who is trying to invite the
+ 3PID
+
+ Returns:
+ Deferred[(str, str)]: A 2-tuple of `(user_id, access_token)` of the
+ 3PID guest account.
+ """
access_token = yield self.store.get_3pid_guest_access_token(medium, address)
if access_token:
- defer.returnValue(access_token)
+ user_info = yield self.auth.get_user_by_access_token(
+ access_token
+ )
- _, access_token = yield self.register(
+ defer.returnValue((user_info["user"].to_string(), access_token))
+
+ user_id, access_token = yield self.register(
generate_token=True,
make_guest=True
)
access_token = yield self.store.save_or_get_3pid_guest_access_token(
medium, address, access_token, inviter_user_id
)
- defer.returnValue(access_token)
+
+ defer.returnValue((user_id, access_token))
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index dfa09141ed..5d81f59b44 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -409,7 +409,7 @@ class RoomListHandler(BaseHandler):
def _get_remote_list_cached(self, server_name, limit=None, since_token=None,
search_filter=None, include_all_networks=False,
third_party_instance_id=None,):
- repl_layer = self.hs.get_replication_layer()
+ repl_layer = self.hs.get_federation_client()
if search_filter:
# We can't cache when asking for search
return repl_layer.get_public_rooms(
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index ed3b97730d..0127cf4166 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -55,7 +55,6 @@ class RoomMemberHandler(object):
self.registration_handler = hs.get_handlers().registration_handler
self.profile_handler = hs.get_profile_handler()
self.event_creation_hander = hs.get_event_creation_handler()
- self.replication_layer = hs.get_replication_layer()
self.member_linearizer = Linearizer(name="member")
@@ -138,7 +137,20 @@ class RoomMemberHandler(object):
defer.returnValue(event)
@defer.inlineCallbacks
- def remote_join(self, remote_room_hosts, room_id, user, content):
+ def _remote_join(self, remote_room_hosts, room_id, user, content):
+ """Try and join a room that this server is not in
+
+ Args:
+ remote_room_hosts (list[str]): List of servers that can be used
+ to join via.
+ room_id (str): Room that we are trying to join
+ user (UserID): User who is trying to join
+ content (dict): A dict that should be used as the content of the
+ join event.
+
+ Returns:
+ Deferred
+ """
if len(remote_room_hosts) == 0:
raise SynapseError(404, "No known servers")
@@ -155,6 +167,43 @@ class RoomMemberHandler(object):
yield user_joined_room(self.distributor, user, room_id)
@defer.inlineCallbacks
+ def _remote_reject_invite(self, remote_room_hosts, room_id, target):
+ """Attempt to reject an invite for a room this server is not in. If we
+ fail to do so we locally mark the invite as rejected.
+
+ Args:
+ remote_room_hosts (list[str]): List of servers to use to try and
+ reject invite
+ room_id (str)
+ target (UserID): The user rejecting the invite
+
+ Returns:
+ Deferred[dict]: A dictionary to be returned to the client, may
+ include event_id etc, or nothing if we locally rejected
+ """
+ fed_handler = self.federation_handler
+ try:
+ ret = yield fed_handler.do_remotely_reject_invite(
+ remote_room_hosts,
+ room_id,
+ target.to_string(),
+ )
+ defer.returnValue(ret)
+ except Exception as e:
+ # if we were unable to reject the exception, just mark
+ # it as rejected on our end and plough ahead.
+ #
+ # The 'except' clause is very broad, but we need to
+ # capture everything from DNS failures upwards
+ #
+ logger.warn("Failed to reject invite: %s", e)
+
+ yield self.store.locally_reject_invite(
+ target.to_string(), room_id
+ )
+ defer.returnValue({})
+
+ @defer.inlineCallbacks
def update_membership(
self,
requester,
@@ -212,7 +261,7 @@ class RoomMemberHandler(object):
# if this is a join with a 3pid signature, we may need to turn a 3pid
# invite into a normal invite before we can handle the join.
if third_party_signed is not None:
- yield self.replication_layer.exchange_third_party_invite(
+ yield self.federation_handler.exchange_third_party_invite(
third_party_signed["sender"],
target.to_string(),
room_id,
@@ -292,7 +341,7 @@ class RoomMemberHandler(object):
raise AuthError(403, "Guest access not allowed")
if not is_host_in_room:
- inviter = yield self.get_inviter(target.to_string(), room_id)
+ inviter = yield self._get_inviter(target.to_string(), room_id)
if inviter and not self.hs.is_mine(inviter):
remote_room_hosts.append(inviter.domain)
@@ -306,7 +355,7 @@ class RoomMemberHandler(object):
if requester.is_guest:
content["kind"] = "guest"
- ret = yield self.remote_join(
+ ret = yield self._remote_join(
remote_room_hosts, room_id, target, content
)
defer.returnValue(ret)
@@ -314,7 +363,7 @@ class RoomMemberHandler(object):
elif effective_membership_state == Membership.LEAVE:
if not is_host_in_room:
# perhaps we've been invited
- inviter = yield self.get_inviter(target.to_string(), room_id)
+ inviter = yield self._get_inviter(target.to_string(), room_id)
if not inviter:
raise SynapseError(404, "Not a known room")
@@ -328,28 +377,10 @@ class RoomMemberHandler(object):
else:
# send the rejection to the inviter's HS.
remote_room_hosts = remote_room_hosts + [inviter.domain]
- fed_handler = self.federation_handler
- try:
- ret = yield fed_handler.do_remotely_reject_invite(
- remote_room_hosts,
- room_id,
- target.to_string(),
- )
- defer.returnValue(ret)
- except Exception as e:
- # if we were unable to reject the exception, just mark
- # it as rejected on our end and plough ahead.
- #
- # The 'except' clause is very broad, but we need to
- # capture everything from DNS failures upwards
- #
- logger.warn("Failed to reject invite: %s", e)
-
- yield self.store.locally_reject_invite(
- target.to_string(), room_id
- )
-
- defer.returnValue({})
+ res = yield self._remote_reject_invite(
+ remote_room_hosts, room_id, target,
+ )
+ defer.returnValue(res)
res = yield self._local_membership_update(
requester=requester,
@@ -496,7 +527,7 @@ class RoomMemberHandler(object):
defer.returnValue((RoomID.from_string(room_id), servers))
@defer.inlineCallbacks
- def get_inviter(self, user_id, room_id):
+ def _get_inviter(self, user_id, room_id):
invite = yield self.store.get_invite_for_user_in_room(
user_id=user_id,
room_id=room_id,
@@ -573,7 +604,7 @@ class RoomMemberHandler(object):
if "mxid" in data:
if "signatures" not in data:
raise AuthError(401, "No signatures on 3pid binding")
- yield self.verify_any_signature(data, id_server)
+ yield self._verify_any_signature(data, id_server)
defer.returnValue(data["mxid"])
except IOError as e:
@@ -581,7 +612,7 @@ class RoomMemberHandler(object):
defer.returnValue(None)
@defer.inlineCallbacks
- def verify_any_signature(self, data, server_hostname):
+ def _verify_any_signature(self, data, server_hostname):
if server_hostname not in data["signatures"]:
raise AuthError(401, "No signature from server %s" % (server_hostname,))
for key_name, signature in data["signatures"][server_hostname].items():
@@ -735,20 +766,16 @@ class RoomMemberHandler(object):
}
if self.config.invite_3pid_guest:
- registration_handler = self.registration_handler
- guest_access_token = yield registration_handler.guest_access_token_for(
+ rh = self.registration_handler
+ guest_user_id, guest_access_token = yield rh.get_or_register_3pid_guest(
medium=medium,
address=address,
inviter_user_id=inviter_user_id,
)
- guest_user_info = yield self.auth.get_user_by_access_token(
- guest_access_token
- )
-
invite_config.update({
"guest_access_token": guest_access_token,
- "guest_user_id": guest_user_info["user"].to_string(),
+ "guest_user_id": guest_user_id,
})
data = yield self.simple_http_client.post_urlencoded_get_json(
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 82dedbbc99..77c0cf146f 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -56,7 +56,7 @@ class TypingHandler(object):
self.federation = hs.get_federation_sender()
- hs.get_replication_layer().register_edu_handler("m.typing", self._recv_edu)
+ hs.get_federation_registry().register_edu_handler("m.typing", self._recv_edu)
hs.get_distributor().observe("user_left_room", self.user_left_room)
|