summary refs log tree commit diff
path: root/synapse/replication
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication')
-rw-r--r--synapse/replication/http/__init__.py4
-rw-r--r--synapse/replication/http/device.py64
-rw-r--r--synapse/replication/http/registration.py65
-rw-r--r--synapse/replication/slave/storage/client_ips.py2
-rw-r--r--synapse/replication/tcp/streams.py6
5 files changed, 137 insertions, 4 deletions
diff --git a/synapse/replication/http/__init__.py b/synapse/replication/http/__init__.py

index 19f214281e..dec63ae68d 100644 --- a/synapse/replication/http/__init__.py +++ b/synapse/replication/http/__init__.py
@@ -14,7 +14,7 @@ # limitations under the License. from synapse.http.server import JsonResource -from synapse.replication.http import federation, membership, send_event +from synapse.replication.http import federation, membership, registration, send_event, device REPLICATION_PREFIX = "/_synapse/replication" @@ -28,3 +28,5 @@ class ReplicationRestResource(JsonResource): send_event.register_servlets(hs, self) membership.register_servlets(hs, self) federation.register_servlets(hs, self) + registration.register_servlets(hs, self) + device.register_servlets(hs, self) diff --git a/synapse/replication/http/device.py b/synapse/replication/http/device.py new file mode 100644
index 0000000000..605de028a0 --- /dev/null +++ b/synapse/replication/http/device.py
@@ -0,0 +1,64 @@ +# -*- coding: utf-8 -*- +# Copyright 2019 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from twisted.internet import defer + +from synapse.http.servlet import parse_json_object_from_request +from synapse.replication.http._base import ReplicationEndpoint + +logger = logging.getLogger(__name__) + + +class CheckDeviceRegisteredServlet(ReplicationEndpoint): + """ + Check a device is registered. + + """ + + NAME = "device_check_registered" + PATH_ARGS = ("user_id") + + def __init__(self, hs): + super(CheckDeviceRegisteredServlet, self).__init__(hs) + self.device_handler = hs.get_device_handler() + + @staticmethod + def _serialize_payload(user_id, device_id, initial_display_name): + """ + """ + return { + "device_id": device_id, + "initial_display_name": initial_display_name, + } + + @defer.inlineCallbacks + def _handle_request(self, request, user_id): + content = parse_json_object_from_request(request) + + device_id = content["device_id"] + initial_display_name = content["initial_display_name"] + + try: + device_id = yield self.device_handler.check_device_registered(user_id, device_id) + except Exception as e: + defer.returnValue((400, str(e))) + + defer.returnValue((200, {"device_id": device_id})) + + +def register_servlets(hs, http_server): + CheckDeviceRegisteredServlet(hs).register(http_server) diff --git a/synapse/replication/http/registration.py b/synapse/replication/http/registration.py new file mode 100644
index 0000000000..0f2f226ae1 --- /dev/null +++ b/synapse/replication/http/registration.py
@@ -0,0 +1,65 @@ +# -*- coding: utf-8 -*- +# Copyright 2019 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from twisted.internet import defer + +from synapse.replication.http._base import ReplicationEndpoint + +logger = logging.getLogger(__name__) + + +class RegistrationUserCacheInvalidationServlet(ReplicationEndpoint): + """ + Invalidate the caches that a registration usually invalidates. + + Request format: + + POST /_synapse/replication/fed_query/:fed_cleanup_room/:txn_id + + {} + """ + + NAME = "reg_invalidate_user_caches" + PATH_ARGS = ("user_id",) + + def __init__(self, hs): + super(RegistrationUserCacheInvalidationServlet, self).__init__(hs) + self.store = hs.get_datastore() + + @staticmethod + def _serialize_payload(user_id, args): + """ + Args: + user_id (str) + """ + return {} + + @defer.inlineCallbacks + def _handle_request(self, request, user_id): + + def invalidate(txn): + self.store._invalidate_cache_and_stream( + txn, self.store.get_user_by_id, (user_id,) + ) + txn.call_after(self.store.is_guest.invalidate, (user_id,)) + + yield self.store.runInteraction("user_invalidate_caches", invalidate) + defer.returnValue((200, {})) + + +def register_servlets(hs, http_server): + RegistrationUserCacheInvalidationServlet(hs).register(http_server) diff --git a/synapse/replication/slave/storage/client_ips.py b/synapse/replication/slave/storage/client_ips.py
index 60641f1a49..5b8521c770 100644 --- a/synapse/replication/slave/storage/client_ips.py +++ b/synapse/replication/slave/storage/client_ips.py
@@ -43,6 +43,8 @@ class SlavedClientIpStore(BaseSlavedStore): if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY: return + self.client_ip_last_seen.prefill(key, now) + self.hs.get_tcp_replication().send_user_ip( user_id, access_token, ip, user_agent, device_id, now ) diff --git a/synapse/replication/tcp/streams.py b/synapse/replication/tcp/streams.py
index c1e626be3f..d49973634e 100644 --- a/synapse/replication/tcp/streams.py +++ b/synapse/replication/tcp/streams.py
@@ -32,7 +32,7 @@ from twisted.internet import defer logger = logging.getLogger(__name__) -MAX_EVENTS_BEHIND = 10000 +MAX_EVENTS_BEHIND = 500000 EventStreamRow = namedtuple("EventStreamRow", ( @@ -265,8 +265,8 @@ class PresenceStream(Stream): store = hs.get_datastore() presence_handler = hs.get_presence_handler() - self.current_token = store.get_current_presence_token - self.update_function = presence_handler.get_all_presence_updates + self.current_token = lambda: 0 + self.update_function = lambda _a, _b: [] super(PresenceStream, self).__init__(hs)