diff --git a/synapse/replication/http/login.py b/synapse/replication/http/login.py
index 1590eca317..63bc0405ea 100644
--- a/synapse/replication/http/login.py
+++ b/synapse/replication/http/login.py
@@ -35,7 +35,7 @@ class RegisterDeviceReplicationServlet(ReplicationEndpoint):
def __init__(self, hs):
super(RegisterDeviceReplicationServlet, self).__init__(hs)
- self.registration_handler = hs.get_handlers().registration_handler
+ self.registration_handler = hs.get_registration_handler()
@staticmethod
def _serialize_payload(user_id, device_id, initial_display_name, is_guest):
diff --git a/synapse/replication/http/membership.py b/synapse/replication/http/membership.py
index e58bebf12a..81a2b204c7 100644
--- a/synapse/replication/http/membership.py
+++ b/synapse/replication/http/membership.py
@@ -191,7 +191,7 @@ class ReplicationRegister3PIDGuestRestServlet(ReplicationEndpoint):
def __init__(self, hs):
super(ReplicationRegister3PIDGuestRestServlet, self).__init__(hs)
- self.registeration_handler = hs.get_handlers().registration_handler
+ self.registeration_handler = hs.get_registration_handler()
self.store = hs.get_datastore()
self.clock = hs.get_clock()
@@ -251,7 +251,7 @@ class ReplicationUserJoinedLeftRoomRestServlet(ReplicationEndpoint):
def __init__(self, hs):
super(ReplicationUserJoinedLeftRoomRestServlet, self).__init__(hs)
- self.registeration_handler = hs.get_handlers().registration_handler
+ self.registeration_handler = hs.get_registration_handler()
self.store = hs.get_datastore()
self.clock = hs.get_clock()
self.distributor = hs.get_distributor()
diff --git a/synapse/replication/http/register.py b/synapse/replication/http/register.py
index bdaf37396c..1d27c9221f 100644
--- a/synapse/replication/http/register.py
+++ b/synapse/replication/http/register.py
@@ -87,5 +87,60 @@ class ReplicationRegisterServlet(ReplicationEndpoint):
defer.returnValue((200, {}))
+class ReplicationPostRegisterActionsServlet(ReplicationEndpoint):
+ """Run any post registration actions
+ """
+
+ NAME = "post_register"
+ PATH_ARGS = ("user_id",)
+
+ def __init__(self, hs):
+ super(ReplicationPostRegisterActionsServlet, self).__init__(hs)
+ self.store = hs.get_datastore()
+ self.registration_handler = hs.get_registration_handler()
+
+ @staticmethod
+ def _serialize_payload(user_id, auth_result, access_token, bind_email,
+ bind_msisdn):
+ """
+ Args:
+ user_id (str): The user ID that consented
+ auth_result (dict): The authenticated credentials of the newly
+ registered user.
+ access_token (str|None): The access token of the newly logged in
+ device, or None if `inhibit_login` enabled.
+ bind_email (bool): Whether to bind the email with the identity
+ server
+ bind_msisdn (bool): Whether to bind the msisdn with the identity
+ server
+ """
+ return {
+ "auth_result": auth_result,
+ "access_token": access_token,
+ "bind_email": bind_email,
+ "bind_msisdn": bind_msisdn,
+ }
+
+ @defer.inlineCallbacks
+ def _handle_request(self, request, user_id):
+ content = parse_json_object_from_request(request)
+
+ auth_result = content["auth_result"]
+ access_token = content["access_token"]
+ bind_email = content["bind_email"]
+ bind_msisdn = content["bind_msisdn"]
+
+ yield self.registration_handler.post_registration_actions(
+ user_id=user_id,
+ auth_result=auth_result,
+ access_token=access_token,
+ bind_email=bind_email,
+ bind_msisdn=bind_msisdn,
+ )
+
+ defer.returnValue((200, {}))
+
+
def register_servlets(hs, http_server):
ReplicationRegisterServlet(hs).register(http_server)
+ ReplicationPostRegisterActionsServlet(hs).register(http_server)
diff --git a/synapse/replication/slave/storage/_base.py b/synapse/replication/slave/storage/_base.py
index 2d81d49e9a..1353a32d00 100644
--- a/synapse/replication/slave/storage/_base.py
+++ b/synapse/replication/slave/storage/_base.py
@@ -17,7 +17,7 @@ import logging
import six
-from synapse.storage._base import SQLBaseStore
+from synapse.storage._base import _CURRENT_STATE_CACHE_NAME, SQLBaseStore
from synapse.storage.engines import PostgresEngine
from ._slaved_id_tracker import SlavedIdTracker
@@ -54,12 +54,17 @@ class BaseSlavedStore(SQLBaseStore):
if stream_name == "caches":
self._cache_id_gen.advance(token)
for row in rows:
- try:
- getattr(self, row.cache_func).invalidate(tuple(row.keys))
- except AttributeError:
- # We probably haven't pulled in the cache in this worker,
- # which is fine.
- pass
+ if row.cache_func == _CURRENT_STATE_CACHE_NAME:
+ room_id = row.keys[0]
+ members_changed = set(row.keys[1:])
+ self._invalidate_state_caches(room_id, members_changed)
+ else:
+ try:
+ getattr(self, row.cache_func).invalidate(tuple(row.keys))
+ except AttributeError:
+ # We probably haven't pulled in the cache in this worker,
+ # which is fine.
+ pass
def _invalidate_cache_and_stream(self, txn, cache_func, keys):
txn.call_after(cache_func.invalidate, keys)
|