diff --git a/synapse/replication/http/__init__.py b/synapse/replication/http/__init__.py
index 19f214281e..81b85352b1 100644
--- a/synapse/replication/http/__init__.py
+++ b/synapse/replication/http/__init__.py
@@ -14,7 +14,7 @@
# limitations under the License.
from synapse.http.server import JsonResource
-from synapse.replication.http import federation, membership, send_event
+from synapse.replication.http import federation, login, membership, register, send_event
REPLICATION_PREFIX = "/_synapse/replication"
@@ -28,3 +28,5 @@ class ReplicationRestResource(JsonResource):
send_event.register_servlets(hs, self)
membership.register_servlets(hs, self)
federation.register_servlets(hs, self)
+ login.register_servlets(hs, self)
+ register.register_servlets(hs, self)
diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py
index 5e5376cf58..e81456ab2b 100644
--- a/synapse/replication/http/_base.py
+++ b/synapse/replication/http/_base.py
@@ -127,7 +127,10 @@ class ReplicationEndpoint(object):
def send_request(**kwargs):
data = yield cls._serialize_payload(**kwargs)
- url_args = [urllib.parse.quote(kwargs[name]) for name in cls.PATH_ARGS]
+ url_args = [
+ urllib.parse.quote(kwargs[name], safe='')
+ for name in cls.PATH_ARGS
+ ]
if cls.CACHE:
txn_id = random_string(10)
diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py
index 64a79da162..0f0a07c422 100644
--- a/synapse/replication/http/federation.py
+++ b/synapse/replication/http/federation.py
@@ -17,7 +17,7 @@ import logging
from twisted.internet import defer
-from synapse.events import FrozenEvent
+from synapse.events import event_type_from_format_version
from synapse.events.snapshot import EventContext
from synapse.http.servlet import parse_json_object_from_request
from synapse.replication.http._base import ReplicationEndpoint
@@ -70,6 +70,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
event_payloads.append({
"event": event.get_pdu_json(),
+ "event_format_version": event.format_version,
"internal_metadata": event.internal_metadata.get_dict(),
"rejected_reason": event.rejected_reason,
"context": serialized_context,
@@ -94,9 +95,12 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
event_and_contexts = []
for event_payload in event_payloads:
event_dict = event_payload["event"]
+ format_ver = event_payload["event_format_version"]
internal_metadata = event_payload["internal_metadata"]
rejected_reason = event_payload["rejected_reason"]
- event = FrozenEvent(event_dict, internal_metadata, rejected_reason)
+
+ EventType = event_type_from_format_version(format_ver)
+ event = EventType(event_dict, internal_metadata, rejected_reason)
context = yield EventContext.deserialize(
self.store, event_payload["context"],
diff --git a/synapse/replication/http/login.py b/synapse/replication/http/login.py
new file mode 100644
index 0000000000..63bc0405ea
--- /dev/null
+++ b/synapse/replication/http/login.py
@@ -0,0 +1,74 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from twisted.internet import defer
+
+from synapse.http.servlet import parse_json_object_from_request
+from synapse.replication.http._base import ReplicationEndpoint
+
+logger = logging.getLogger(__name__)
+
+
+class RegisterDeviceReplicationServlet(ReplicationEndpoint):
+ """Ensure a device is registered, generating a new access token for the
+ device.
+
+ Used during registration and login.
+ """
+
+ NAME = "device_check_registered"
+ PATH_ARGS = ("user_id",)
+
+ def __init__(self, hs):
+ super(RegisterDeviceReplicationServlet, self).__init__(hs)
+ self.registration_handler = hs.get_registration_handler()
+
+ @staticmethod
+ def _serialize_payload(user_id, device_id, initial_display_name, is_guest):
+ """
+ Args:
+ device_id (str|None): Device ID to use, if None a new one is
+ generated.
+ initial_display_name (str|None)
+ is_guest (bool)
+ """
+ return {
+ "device_id": device_id,
+ "initial_display_name": initial_display_name,
+ "is_guest": is_guest,
+ }
+
+ @defer.inlineCallbacks
+ def _handle_request(self, request, user_id):
+ content = parse_json_object_from_request(request)
+
+ device_id = content["device_id"]
+ initial_display_name = content["initial_display_name"]
+ is_guest = content["is_guest"]
+
+ device_id, access_token = yield self.registration_handler.register_device(
+ user_id, device_id, initial_display_name, is_guest,
+ )
+
+ defer.returnValue((200, {
+ "device_id": device_id,
+ "access_token": access_token,
+ }))
+
+
+def register_servlets(hs, http_server):
+ RegisterDeviceReplicationServlet(hs).register(http_server)
diff --git a/synapse/replication/http/membership.py b/synapse/replication/http/membership.py
index e58bebf12a..81a2b204c7 100644
--- a/synapse/replication/http/membership.py
+++ b/synapse/replication/http/membership.py
@@ -191,7 +191,7 @@ class ReplicationRegister3PIDGuestRestServlet(ReplicationEndpoint):
def __init__(self, hs):
super(ReplicationRegister3PIDGuestRestServlet, self).__init__(hs)
- self.registeration_handler = hs.get_handlers().registration_handler
+ self.registeration_handler = hs.get_registration_handler()
self.store = hs.get_datastore()
self.clock = hs.get_clock()
@@ -251,7 +251,7 @@ class ReplicationUserJoinedLeftRoomRestServlet(ReplicationEndpoint):
def __init__(self, hs):
super(ReplicationUserJoinedLeftRoomRestServlet, self).__init__(hs)
- self.registeration_handler = hs.get_handlers().registration_handler
+ self.registeration_handler = hs.get_registration_handler()
self.store = hs.get_datastore()
self.clock = hs.get_clock()
self.distributor = hs.get_distributor()
diff --git a/synapse/replication/http/register.py b/synapse/replication/http/register.py
new file mode 100644
index 0000000000..1d27c9221f
--- /dev/null
+++ b/synapse/replication/http/register.py
@@ -0,0 +1,146 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from twisted.internet import defer
+
+from synapse.http.servlet import parse_json_object_from_request
+from synapse.replication.http._base import ReplicationEndpoint
+
+logger = logging.getLogger(__name__)
+
+
+class ReplicationRegisterServlet(ReplicationEndpoint):
+ """Register a new user
+ """
+
+ NAME = "register_user"
+ PATH_ARGS = ("user_id",)
+
+ def __init__(self, hs):
+ super(ReplicationRegisterServlet, self).__init__(hs)
+ self.store = hs.get_datastore()
+
+ @staticmethod
+ def _serialize_payload(
+ user_id, token, password_hash, was_guest, make_guest, appservice_id,
+ create_profile_with_displayname, admin, user_type,
+ ):
+ """
+ Args:
+ user_id (str): The desired user ID to register.
+ token (str): The desired access token to use for this user. If this
+ is not None, the given access token is associated with the user
+ id.
+ password_hash (str|None): Optional. The password hash for this user.
+ was_guest (bool): Optional. Whether this is a guest account being
+ upgraded to a non-guest account.
+ make_guest (boolean): True if the the new user should be guest,
+ false to add a regular user account.
+ appservice_id (str|None): The ID of the appservice registering the user.
+ create_profile_with_displayname (unicode|None): Optionally create a
+ profile for the user, setting their displayname to the given value
+ admin (boolean): is an admin user?
+ user_type (str|None): type of user. One of the values from
+ api.constants.UserTypes, or None for a normal user.
+ """
+ return {
+ "token": token,
+ "password_hash": password_hash,
+ "was_guest": was_guest,
+ "make_guest": make_guest,
+ "appservice_id": appservice_id,
+ "create_profile_with_displayname": create_profile_with_displayname,
+ "admin": admin,
+ "user_type": user_type,
+ }
+
+ @defer.inlineCallbacks
+ def _handle_request(self, request, user_id):
+ content = parse_json_object_from_request(request)
+
+ yield self.store.register(
+ user_id=user_id,
+ token=content["token"],
+ password_hash=content["password_hash"],
+ was_guest=content["was_guest"],
+ make_guest=content["make_guest"],
+ appservice_id=content["appservice_id"],
+ create_profile_with_displayname=content["create_profile_with_displayname"],
+ admin=content["admin"],
+ user_type=content["user_type"],
+ )
+
+ defer.returnValue((200, {}))
+
+
+class ReplicationPostRegisterActionsServlet(ReplicationEndpoint):
+ """Run any post registration actions
+ """
+
+ NAME = "post_register"
+ PATH_ARGS = ("user_id",)
+
+ def __init__(self, hs):
+ super(ReplicationPostRegisterActionsServlet, self).__init__(hs)
+ self.store = hs.get_datastore()
+ self.registration_handler = hs.get_registration_handler()
+
+ @staticmethod
+ def _serialize_payload(user_id, auth_result, access_token, bind_email,
+ bind_msisdn):
+ """
+ Args:
+ user_id (str): The user ID that consented
+ auth_result (dict): The authenticated credentials of the newly
+ registered user.
+ access_token (str|None): The access token of the newly logged in
+ device, or None if `inhibit_login` enabled.
+ bind_email (bool): Whether to bind the email with the identity
+ server
+ bind_msisdn (bool): Whether to bind the msisdn with the identity
+ server
+ """
+ return {
+ "auth_result": auth_result,
+ "access_token": access_token,
+ "bind_email": bind_email,
+ "bind_msisdn": bind_msisdn,
+ }
+
+ @defer.inlineCallbacks
+ def _handle_request(self, request, user_id):
+ content = parse_json_object_from_request(request)
+
+ auth_result = content["auth_result"]
+ access_token = content["access_token"]
+ bind_email = content["bind_email"]
+ bind_msisdn = content["bind_msisdn"]
+
+ yield self.registration_handler.post_registration_actions(
+ user_id=user_id,
+ auth_result=auth_result,
+ access_token=access_token,
+ bind_email=bind_email,
+ bind_msisdn=bind_msisdn,
+ )
+
+ defer.returnValue((200, {}))
+
+
+def register_servlets(hs, http_server):
+ ReplicationRegisterServlet(hs).register(http_server)
+ ReplicationPostRegisterActionsServlet(hs).register(http_server)
diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py
index 5b52c91650..3635015eda 100644
--- a/synapse/replication/http/send_event.py
+++ b/synapse/replication/http/send_event.py
@@ -17,7 +17,7 @@ import logging
from twisted.internet import defer
-from synapse.events import FrozenEvent
+from synapse.events import event_type_from_format_version
from synapse.events.snapshot import EventContext
from synapse.http.servlet import parse_json_object_from_request
from synapse.replication.http._base import ReplicationEndpoint
@@ -74,6 +74,7 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
payload = {
"event": event.get_pdu_json(),
+ "event_format_version": event.format_version,
"internal_metadata": event.internal_metadata.get_dict(),
"rejected_reason": event.rejected_reason,
"context": serialized_context,
@@ -90,9 +91,12 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
content = parse_json_object_from_request(request)
event_dict = content["event"]
+ format_ver = content["event_format_version"]
internal_metadata = content["internal_metadata"]
rejected_reason = content["rejected_reason"]
- event = FrozenEvent(event_dict, internal_metadata, rejected_reason)
+
+ EventType = event_type_from_format_version(format_ver)
+ event = EventType(event_dict, internal_metadata, rejected_reason)
requester = Requester.deserialize(self.store, content["requester"])
context = yield EventContext.deserialize(self.store, content["context"])
diff --git a/synapse/replication/slave/storage/_base.py b/synapse/replication/slave/storage/_base.py
index 2d81d49e9a..817d1f67f9 100644
--- a/synapse/replication/slave/storage/_base.py
+++ b/synapse/replication/slave/storage/_base.py
@@ -17,7 +17,7 @@ import logging
import six
-from synapse.storage._base import SQLBaseStore
+from synapse.storage._base import _CURRENT_STATE_CACHE_NAME, SQLBaseStore
from synapse.storage.engines import PostgresEngine
from ._slaved_id_tracker import SlavedIdTracker
@@ -54,12 +54,12 @@ class BaseSlavedStore(SQLBaseStore):
if stream_name == "caches":
self._cache_id_gen.advance(token)
for row in rows:
- try:
- getattr(self, row.cache_func).invalidate(tuple(row.keys))
- except AttributeError:
- # We probably haven't pulled in the cache in this worker,
- # which is fine.
- pass
+ if row.cache_func == _CURRENT_STATE_CACHE_NAME:
+ room_id = row.keys[0]
+ members_changed = set(row.keys[1:])
+ self._invalidate_state_caches(room_id, members_changed)
+ else:
+ self._attempt_to_invalidate_cache(row.cache_func, tuple(row.keys))
def _invalidate_cache_and_stream(self, txn, cache_func, keys):
txn.call_after(cache_func.invalidate, keys)
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index cbe9645817..586dddb40b 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -106,7 +106,7 @@ class ReplicationClientHandler(object):
Can be overriden in subclasses to handle more.
"""
- logger.info("Received rdata %s -> %s", stream_name, token)
+ logger.debug("Received rdata %s -> %s", stream_name, token)
return self.store.process_replication_rows(stream_name, token, rows)
def on_position(self, stream_name, token):
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index 5dc7b3fffc..0b3fe6cbf5 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -656,7 +656,7 @@ tcp_inbound_commands = LaterGauge(
"",
["command", "name"],
lambda: {
- (k[0], p.name,): count
+ (k, p.name,): count
for p in connected_connections
for k, count in iteritems(p.inbound_commands_counter)
},
@@ -667,7 +667,7 @@ tcp_outbound_commands = LaterGauge(
"",
["command", "name"],
lambda: {
- (k[0], p.name,): count
+ (k, p.name,): count
for p in connected_connections
for k, count in iteritems(p.outbound_commands_counter)
},
|