diff options
Diffstat (limited to 'synapse/replication')
-rw-r--r-- | synapse/replication/http/__init__.py | 4 | ||||
-rw-r--r-- | synapse/replication/http/_base.py | 5 | ||||
-rw-r--r-- | synapse/replication/http/federation.py | 8 | ||||
-rw-r--r-- | synapse/replication/http/login.py | 74 | ||||
-rw-r--r-- | synapse/replication/http/membership.py | 4 | ||||
-rw-r--r-- | synapse/replication/http/register.py | 146 | ||||
-rw-r--r-- | synapse/replication/http/send_event.py | 8 | ||||
-rw-r--r-- | synapse/replication/slave/storage/_base.py | 14 | ||||
-rw-r--r-- | synapse/replication/tcp/client.py | 2 | ||||
-rw-r--r-- | synapse/replication/tcp/protocol.py | 4 |
10 files changed, 251 insertions, 18 deletions
diff --git a/synapse/replication/http/__init__.py b/synapse/replication/http/__init__.py index 19f214281e..81b85352b1 100644 --- a/synapse/replication/http/__init__.py +++ b/synapse/replication/http/__init__.py @@ -14,7 +14,7 @@ # limitations under the License. from synapse.http.server import JsonResource -from synapse.replication.http import federation, membership, send_event +from synapse.replication.http import federation, login, membership, register, send_event REPLICATION_PREFIX = "/_synapse/replication" @@ -28,3 +28,5 @@ class ReplicationRestResource(JsonResource): send_event.register_servlets(hs, self) membership.register_servlets(hs, self) federation.register_servlets(hs, self) + login.register_servlets(hs, self) + register.register_servlets(hs, self) diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py index 5e5376cf58..e81456ab2b 100644 --- a/synapse/replication/http/_base.py +++ b/synapse/replication/http/_base.py @@ -127,7 +127,10 @@ class ReplicationEndpoint(object): def send_request(**kwargs): data = yield cls._serialize_payload(**kwargs) - url_args = [urllib.parse.quote(kwargs[name]) for name in cls.PATH_ARGS] + url_args = [ + urllib.parse.quote(kwargs[name], safe='') + for name in cls.PATH_ARGS + ] if cls.CACHE: txn_id = random_string(10) diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py index 64a79da162..0f0a07c422 100644 --- a/synapse/replication/http/federation.py +++ b/synapse/replication/http/federation.py @@ -17,7 +17,7 @@ import logging from twisted.internet import defer -from synapse.events import FrozenEvent +from synapse.events import event_type_from_format_version from synapse.events.snapshot import EventContext from synapse.http.servlet import parse_json_object_from_request from synapse.replication.http._base import ReplicationEndpoint @@ -70,6 +70,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint): event_payloads.append({ "event": event.get_pdu_json(), + "event_format_version": event.format_version, "internal_metadata": event.internal_metadata.get_dict(), "rejected_reason": event.rejected_reason, "context": serialized_context, @@ -94,9 +95,12 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint): event_and_contexts = [] for event_payload in event_payloads: event_dict = event_payload["event"] + format_ver = event_payload["event_format_version"] internal_metadata = event_payload["internal_metadata"] rejected_reason = event_payload["rejected_reason"] - event = FrozenEvent(event_dict, internal_metadata, rejected_reason) + + EventType = event_type_from_format_version(format_ver) + event = EventType(event_dict, internal_metadata, rejected_reason) context = yield EventContext.deserialize( self.store, event_payload["context"], diff --git a/synapse/replication/http/login.py b/synapse/replication/http/login.py new file mode 100644 index 0000000000..63bc0405ea --- /dev/null +++ b/synapse/replication/http/login.py @@ -0,0 +1,74 @@ +# -*- coding: utf-8 -*- +# Copyright 2019 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from twisted.internet import defer + +from synapse.http.servlet import parse_json_object_from_request +from synapse.replication.http._base import ReplicationEndpoint + +logger = logging.getLogger(__name__) + + +class RegisterDeviceReplicationServlet(ReplicationEndpoint): + """Ensure a device is registered, generating a new access token for the + device. + + Used during registration and login. + """ + + NAME = "device_check_registered" + PATH_ARGS = ("user_id",) + + def __init__(self, hs): + super(RegisterDeviceReplicationServlet, self).__init__(hs) + self.registration_handler = hs.get_registration_handler() + + @staticmethod + def _serialize_payload(user_id, device_id, initial_display_name, is_guest): + """ + Args: + device_id (str|None): Device ID to use, if None a new one is + generated. + initial_display_name (str|None) + is_guest (bool) + """ + return { + "device_id": device_id, + "initial_display_name": initial_display_name, + "is_guest": is_guest, + } + + @defer.inlineCallbacks + def _handle_request(self, request, user_id): + content = parse_json_object_from_request(request) + + device_id = content["device_id"] + initial_display_name = content["initial_display_name"] + is_guest = content["is_guest"] + + device_id, access_token = yield self.registration_handler.register_device( + user_id, device_id, initial_display_name, is_guest, + ) + + defer.returnValue((200, { + "device_id": device_id, + "access_token": access_token, + })) + + +def register_servlets(hs, http_server): + RegisterDeviceReplicationServlet(hs).register(http_server) diff --git a/synapse/replication/http/membership.py b/synapse/replication/http/membership.py index e58bebf12a..81a2b204c7 100644 --- a/synapse/replication/http/membership.py +++ b/synapse/replication/http/membership.py @@ -191,7 +191,7 @@ class ReplicationRegister3PIDGuestRestServlet(ReplicationEndpoint): def __init__(self, hs): super(ReplicationRegister3PIDGuestRestServlet, self).__init__(hs) - self.registeration_handler = hs.get_handlers().registration_handler + self.registeration_handler = hs.get_registration_handler() self.store = hs.get_datastore() self.clock = hs.get_clock() @@ -251,7 +251,7 @@ class ReplicationUserJoinedLeftRoomRestServlet(ReplicationEndpoint): def __init__(self, hs): super(ReplicationUserJoinedLeftRoomRestServlet, self).__init__(hs) - self.registeration_handler = hs.get_handlers().registration_handler + self.registeration_handler = hs.get_registration_handler() self.store = hs.get_datastore() self.clock = hs.get_clock() self.distributor = hs.get_distributor() diff --git a/synapse/replication/http/register.py b/synapse/replication/http/register.py new file mode 100644 index 0000000000..1d27c9221f --- /dev/null +++ b/synapse/replication/http/register.py @@ -0,0 +1,146 @@ +# -*- coding: utf-8 -*- +# Copyright 2019 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from twisted.internet import defer + +from synapse.http.servlet import parse_json_object_from_request +from synapse.replication.http._base import ReplicationEndpoint + +logger = logging.getLogger(__name__) + + +class ReplicationRegisterServlet(ReplicationEndpoint): + """Register a new user + """ + + NAME = "register_user" + PATH_ARGS = ("user_id",) + + def __init__(self, hs): + super(ReplicationRegisterServlet, self).__init__(hs) + self.store = hs.get_datastore() + + @staticmethod + def _serialize_payload( + user_id, token, password_hash, was_guest, make_guest, appservice_id, + create_profile_with_displayname, admin, user_type, + ): + """ + Args: + user_id (str): The desired user ID to register. + token (str): The desired access token to use for this user. If this + is not None, the given access token is associated with the user + id. + password_hash (str|None): Optional. The password hash for this user. + was_guest (bool): Optional. Whether this is a guest account being + upgraded to a non-guest account. + make_guest (boolean): True if the the new user should be guest, + false to add a regular user account. + appservice_id (str|None): The ID of the appservice registering the user. + create_profile_with_displayname (unicode|None): Optionally create a + profile for the user, setting their displayname to the given value + admin (boolean): is an admin user? + user_type (str|None): type of user. One of the values from + api.constants.UserTypes, or None for a normal user. + """ + return { + "token": token, + "password_hash": password_hash, + "was_guest": was_guest, + "make_guest": make_guest, + "appservice_id": appservice_id, + "create_profile_with_displayname": create_profile_with_displayname, + "admin": admin, + "user_type": user_type, + } + + @defer.inlineCallbacks + def _handle_request(self, request, user_id): + content = parse_json_object_from_request(request) + + yield self.store.register( + user_id=user_id, + token=content["token"], + password_hash=content["password_hash"], + was_guest=content["was_guest"], + make_guest=content["make_guest"], + appservice_id=content["appservice_id"], + create_profile_with_displayname=content["create_profile_with_displayname"], + admin=content["admin"], + user_type=content["user_type"], + ) + + defer.returnValue((200, {})) + + +class ReplicationPostRegisterActionsServlet(ReplicationEndpoint): + """Run any post registration actions + """ + + NAME = "post_register" + PATH_ARGS = ("user_id",) + + def __init__(self, hs): + super(ReplicationPostRegisterActionsServlet, self).__init__(hs) + self.store = hs.get_datastore() + self.registration_handler = hs.get_registration_handler() + + @staticmethod + def _serialize_payload(user_id, auth_result, access_token, bind_email, + bind_msisdn): + """ + Args: + user_id (str): The user ID that consented + auth_result (dict): The authenticated credentials of the newly + registered user. + access_token (str|None): The access token of the newly logged in + device, or None if `inhibit_login` enabled. + bind_email (bool): Whether to bind the email with the identity + server + bind_msisdn (bool): Whether to bind the msisdn with the identity + server + """ + return { + "auth_result": auth_result, + "access_token": access_token, + "bind_email": bind_email, + "bind_msisdn": bind_msisdn, + } + + @defer.inlineCallbacks + def _handle_request(self, request, user_id): + content = parse_json_object_from_request(request) + + auth_result = content["auth_result"] + access_token = content["access_token"] + bind_email = content["bind_email"] + bind_msisdn = content["bind_msisdn"] + + yield self.registration_handler.post_registration_actions( + user_id=user_id, + auth_result=auth_result, + access_token=access_token, + bind_email=bind_email, + bind_msisdn=bind_msisdn, + ) + + defer.returnValue((200, {})) + + +def register_servlets(hs, http_server): + ReplicationRegisterServlet(hs).register(http_server) + ReplicationPostRegisterActionsServlet(hs).register(http_server) diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py index 5b52c91650..3635015eda 100644 --- a/synapse/replication/http/send_event.py +++ b/synapse/replication/http/send_event.py @@ -17,7 +17,7 @@ import logging from twisted.internet import defer -from synapse.events import FrozenEvent +from synapse.events import event_type_from_format_version from synapse.events.snapshot import EventContext from synapse.http.servlet import parse_json_object_from_request from synapse.replication.http._base import ReplicationEndpoint @@ -74,6 +74,7 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint): payload = { "event": event.get_pdu_json(), + "event_format_version": event.format_version, "internal_metadata": event.internal_metadata.get_dict(), "rejected_reason": event.rejected_reason, "context": serialized_context, @@ -90,9 +91,12 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint): content = parse_json_object_from_request(request) event_dict = content["event"] + format_ver = content["event_format_version"] internal_metadata = content["internal_metadata"] rejected_reason = content["rejected_reason"] - event = FrozenEvent(event_dict, internal_metadata, rejected_reason) + + EventType = event_type_from_format_version(format_ver) + event = EventType(event_dict, internal_metadata, rejected_reason) requester = Requester.deserialize(self.store, content["requester"]) context = yield EventContext.deserialize(self.store, content["context"]) diff --git a/synapse/replication/slave/storage/_base.py b/synapse/replication/slave/storage/_base.py index 2d81d49e9a..817d1f67f9 100644 --- a/synapse/replication/slave/storage/_base.py +++ b/synapse/replication/slave/storage/_base.py @@ -17,7 +17,7 @@ import logging import six -from synapse.storage._base import SQLBaseStore +from synapse.storage._base import _CURRENT_STATE_CACHE_NAME, SQLBaseStore from synapse.storage.engines import PostgresEngine from ._slaved_id_tracker import SlavedIdTracker @@ -54,12 +54,12 @@ class BaseSlavedStore(SQLBaseStore): if stream_name == "caches": self._cache_id_gen.advance(token) for row in rows: - try: - getattr(self, row.cache_func).invalidate(tuple(row.keys)) - except AttributeError: - # We probably haven't pulled in the cache in this worker, - # which is fine. - pass + if row.cache_func == _CURRENT_STATE_CACHE_NAME: + room_id = row.keys[0] + members_changed = set(row.keys[1:]) + self._invalidate_state_caches(room_id, members_changed) + else: + self._attempt_to_invalidate_cache(row.cache_func, tuple(row.keys)) def _invalidate_cache_and_stream(self, txn, cache_func, keys): txn.call_after(cache_func.invalidate, keys) diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index cbe9645817..586dddb40b 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -106,7 +106,7 @@ class ReplicationClientHandler(object): Can be overriden in subclasses to handle more. """ - logger.info("Received rdata %s -> %s", stream_name, token) + logger.debug("Received rdata %s -> %s", stream_name, token) return self.store.process_replication_rows(stream_name, token, rows) def on_position(self, stream_name, token): diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index 5dc7b3fffc..0b3fe6cbf5 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -656,7 +656,7 @@ tcp_inbound_commands = LaterGauge( "", ["command", "name"], lambda: { - (k[0], p.name,): count + (k, p.name,): count for p in connected_connections for k, count in iteritems(p.inbound_commands_counter) }, @@ -667,7 +667,7 @@ tcp_outbound_commands = LaterGauge( "", ["command", "name"], lambda: { - (k[0], p.name,): count + (k, p.name,): count for p in connected_connections for k, count in iteritems(p.outbound_commands_counter) }, |