diff options
Diffstat (limited to 'synapse/replication')
-rw-r--r-- | synapse/replication/slave/storage/appservice.py | 5 | ||||
-rw-r--r-- | synapse/replication/slave/storage/client_ips.py | 48 | ||||
-rw-r--r-- | synapse/replication/slave/storage/devices.py | 2 | ||||
-rw-r--r-- | synapse/replication/slave/storage/events.py | 5 | ||||
-rw-r--r-- | synapse/replication/tcp/client.py | 7 | ||||
-rw-r--r-- | synapse/replication/tcp/commands.py | 32 | ||||
-rw-r--r-- | synapse/replication/tcp/protocol.py | 6 | ||||
-rw-r--r-- | synapse/replication/tcp/resource.py | 13 | ||||
-rw-r--r-- | synapse/replication/tcp/streams.py | 22 |
9 files changed, 137 insertions, 3 deletions
diff --git a/synapse/replication/slave/storage/appservice.py b/synapse/replication/slave/storage/appservice.py index a374f2f1a2..0d3f31a50c 100644 --- a/synapse/replication/slave/storage/appservice.py +++ b/synapse/replication/slave/storage/appservice.py @@ -16,6 +16,7 @@ from ._base import BaseSlavedStore from synapse.storage import DataStore from synapse.config.appservice import load_appservices +from synapse.storage.appservice import _make_exclusive_regex class SlavedApplicationServiceStore(BaseSlavedStore): @@ -25,6 +26,7 @@ class SlavedApplicationServiceStore(BaseSlavedStore): hs.config.server_name, hs.config.app_service_config_files ) + self.exclusive_user_regex = _make_exclusive_regex(self.services_cache) get_app_service_by_token = DataStore.get_app_service_by_token.__func__ get_app_service_by_user_id = DataStore.get_app_service_by_user_id.__func__ @@ -38,3 +40,6 @@ class SlavedApplicationServiceStore(BaseSlavedStore): get_appservice_state = DataStore.get_appservice_state.__func__ set_appservice_last_pos = DataStore.set_appservice_last_pos.__func__ set_appservice_state = DataStore.set_appservice_state.__func__ + get_if_app_services_interested_in_user = ( + DataStore.get_if_app_services_interested_in_user.__func__ + ) diff --git a/synapse/replication/slave/storage/client_ips.py b/synapse/replication/slave/storage/client_ips.py new file mode 100644 index 0000000000..65250285e8 --- /dev/null +++ b/synapse/replication/slave/storage/client_ips.py @@ -0,0 +1,48 @@ +# -*- coding: utf-8 -*- +# Copyright 2017 Vector Creations Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ._base import BaseSlavedStore +from synapse.storage.client_ips import LAST_SEEN_GRANULARITY +from synapse.util.caches import CACHE_SIZE_FACTOR +from synapse.util.caches.descriptors import Cache + + +class SlavedClientIpStore(BaseSlavedStore): + def __init__(self, db_conn, hs): + super(SlavedClientIpStore, self).__init__(db_conn, hs) + + self.client_ip_last_seen = Cache( + name="client_ip_last_seen", + keylen=4, + max_entries=50000 * CACHE_SIZE_FACTOR, + ) + + def insert_client_ip(self, user, access_token, ip, user_agent, device_id): + now = int(self._clock.time_msec()) + user_id = user.to_string() + key = (user_id, access_token, ip) + + try: + last_seen = self.client_ip_last_seen.get(key) + except KeyError: + last_seen = None + + # Rate-limited inserts + if last_seen is not None and (now - last_seen) < LAST_SEEN_GRANULARITY: + return + + self.hs.get_tcp_replication().send_user_ip( + user_id, access_token, ip, user_agent, device_id, now + ) diff --git a/synapse/replication/slave/storage/devices.py b/synapse/replication/slave/storage/devices.py index 4d4a435471..7687867aee 100644 --- a/synapse/replication/slave/storage/devices.py +++ b/synapse/replication/slave/storage/devices.py @@ -16,6 +16,7 @@ from ._base import BaseSlavedStore from ._slaved_id_tracker import SlavedIdTracker from synapse.storage import DataStore +from synapse.storage.end_to_end_keys import EndToEndKeyStore from synapse.util.caches.stream_change_cache import StreamChangeCache @@ -45,6 +46,7 @@ class SlavedDeviceStore(BaseSlavedStore): _mark_as_sent_devices_by_remote_txn = ( DataStore._mark_as_sent_devices_by_remote_txn.__func__ ) + count_e2e_one_time_keys = EndToEndKeyStore.__dict__["count_e2e_one_time_keys"] def stream_positions(self): result = super(SlavedDeviceStore, self).stream_positions() diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index fcaf58b93b..94ebbffc1b 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -108,6 +108,8 @@ class SlavedEventStore(BaseSlavedStore): get_current_state_ids = ( StateStore.__dict__["get_current_state_ids"] ) + get_state_group_delta = StateStore.__dict__["get_state_group_delta"] + _get_joined_hosts_cache = RoomMemberStore.__dict__["_get_joined_hosts_cache"] has_room_changed_since = DataStore.has_room_changed_since.__func__ get_unread_push_actions_for_user_in_range_for_http = ( @@ -151,8 +153,7 @@ class SlavedEventStore(BaseSlavedStore): get_room_events_stream_for_rooms = ( DataStore.get_room_events_stream_for_rooms.__func__ ) - is_host_joined = DataStore.is_host_joined.__func__ - _is_host_joined = RoomMemberStore.__dict__["_is_host_joined"] + is_host_joined = RoomMemberStore.__dict__["is_host_joined"] get_stream_token_for_event = DataStore.get_stream_token_for_event.__func__ _set_before_and_after = staticmethod(DataStore._set_before_and_after) diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index 90fb6c1336..6d2513c4e2 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -20,6 +20,7 @@ from twisted.internet.protocol import ReconnectingClientFactory from .commands import ( FederationAckCommand, UserSyncCommand, RemovePusherCommand, InvalidateCacheCommand, + UserIpCommand, ) from .protocol import ClientReplicationStreamProtocol @@ -178,6 +179,12 @@ class ReplicationClientHandler(object): cmd = InvalidateCacheCommand(cache_func.__name__, keys) self.send_command(cmd) + def send_user_ip(self, user_id, access_token, ip, user_agent, device_id, last_seen): + """Tell the master that the user made a request. + """ + cmd = UserIpCommand(user_id, access_token, ip, user_agent, device_id, last_seen) + self.send_command(cmd) + def await_sync(self, data): """Returns a deferred that is resolved when we receive a SYNC command with given data. diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py index 84d2a2272a..a009214e43 100644 --- a/synapse/replication/tcp/commands.py +++ b/synapse/replication/tcp/commands.py @@ -304,6 +304,36 @@ class InvalidateCacheCommand(Command): return " ".join((self.cache_func, json.dumps(self.keys))) +class UserIpCommand(Command): + """Sent periodically when a worker sees activity from a client. + + Format:: + + USER_IP <user_id>, <access_token>, <ip>, <device_id>, <last_seen>, <user_agent> + """ + NAME = "USER_IP" + + def __init__(self, user_id, access_token, ip, user_agent, device_id, last_seen): + self.user_id = user_id + self.access_token = access_token + self.ip = ip + self.user_agent = user_agent + self.device_id = device_id + self.last_seen = last_seen + + @classmethod + def from_line(cls, line): + user_id, access_token, ip, device_id, last_seen, user_agent = line.split(" ", 5) + + return cls(user_id, access_token, ip, user_agent, device_id, int(last_seen)) + + def to_line(self): + return " ".join(( + self.user_id, self.access_token, self.ip, self.device_id, + str(self.last_seen), self.user_agent, + )) + + # Map of command name to command type. COMMAND_MAP = { cmd.NAME: cmd @@ -320,6 +350,7 @@ COMMAND_MAP = { SyncCommand, RemovePusherCommand, InvalidateCacheCommand, + UserIpCommand, ) } @@ -342,5 +373,6 @@ VALID_CLIENT_COMMANDS = ( FederationAckCommand.NAME, RemovePusherCommand.NAME, InvalidateCacheCommand.NAME, + UserIpCommand.NAME, ErrorCommand.NAME, ) diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index 9fee2a484b..062272f8dd 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -406,6 +406,12 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol): def on_INVALIDATE_CACHE(self, cmd): self.streamer.on_invalidate_cache(cmd.cache_func, cmd.keys) + def on_USER_IP(self, cmd): + self.streamer.on_user_ip( + cmd.user_id, cmd.access_token, cmd.ip, cmd.user_agent, cmd.device_id, + cmd.last_seen, + ) + @defer.inlineCallbacks def subscribe_to_stream(self, stream_name, token): """Subscribe the remote to a streams. diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index 8b2c4c3043..3ea3ca5a6f 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -35,6 +35,7 @@ user_sync_counter = metrics.register_counter("user_sync") federation_ack_counter = metrics.register_counter("federation_ack") remove_pusher_counter = metrics.register_counter("remove_pusher") invalidate_cache_counter = metrics.register_counter("invalidate_cache") +user_ip_cache_counter = metrics.register_counter("user_ip_cache") logger = logging.getLogger(__name__) @@ -67,6 +68,7 @@ class ReplicationStreamer(object): self.store = hs.get_datastore() self.presence_handler = hs.get_presence_handler() self.clock = hs.get_clock() + self.notifier = hs.get_notifier() # Current connections. self.connections = [] @@ -99,7 +101,7 @@ class ReplicationStreamer(object): if not hs.config.send_federation: self.federation_sender = hs.get_federation_sender() - hs.get_notifier().add_replication_callback(self.on_notifier_poke) + self.notifier.add_replication_callback(self.on_notifier_poke) # Keeps track of whether we are currently checking for updates self.is_looping = False @@ -237,6 +239,15 @@ class ReplicationStreamer(object): invalidate_cache_counter.inc() getattr(self.store, cache_func).invalidate(tuple(keys)) + @measure_func("repl.on_user_ip") + def on_user_ip(self, user_id, access_token, ip, user_agent, device_id, last_seen): + """The client saw a user request + """ + user_ip_cache_counter.inc() + self.store.insert_client_ip( + user_id, access_token, ip, user_agent, device_id, last_seen, + ) + def send_sync_to_all_connections(self, data): """Sends a SYNC command to all clients. diff --git a/synapse/replication/tcp/streams.py b/synapse/replication/tcp/streams.py index 369d5f2428..fbafe12cc2 100644 --- a/synapse/replication/tcp/streams.py +++ b/synapse/replication/tcp/streams.py @@ -112,6 +112,12 @@ AccountDataStreamRow = namedtuple("AccountDataStream", ( "data_type", # str "data", # dict )) +CurrentStateDeltaStreamRow = namedtuple("CurrentStateDeltaStream", ( + "room_id", # str + "type", # str + "state_key", # str + "event_id", # str, optional +)) class Stream(object): @@ -443,6 +449,21 @@ class AccountDataStream(Stream): defer.returnValue(results) +class CurrentStateDeltaStream(Stream): + """Current state for a room was changed + """ + NAME = "current_state_deltas" + ROW_TYPE = CurrentStateDeltaStreamRow + + def __init__(self, hs): + store = hs.get_datastore() + + self.current_token = store.get_max_current_state_delta_stream_id + self.update_function = store.get_all_updated_current_state_deltas + + super(CurrentStateDeltaStream, self).__init__(hs) + + STREAMS_MAP = { stream.NAME: stream for stream in ( @@ -460,5 +481,6 @@ STREAMS_MAP = { FederationStream, TagAccountDataStream, AccountDataStream, + CurrentStateDeltaStream, ) } |