From f85a4152796d7ec39787b00fb4f177d682fe41fb Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 22 May 2017 16:31:24 +0100 Subject: Add missing storage function to slave store --- synapse/replication/slave/storage/devices.py | 2 ++ 1 file changed, 2 insertions(+) (limited to 'synapse/replication') diff --git a/synapse/replication/slave/storage/devices.py b/synapse/replication/slave/storage/devices.py index 4d4a435471..7687867aee 100644 --- a/synapse/replication/slave/storage/devices.py +++ b/synapse/replication/slave/storage/devices.py @@ -16,6 +16,7 @@ from ._base import BaseSlavedStore from ._slaved_id_tracker import SlavedIdTracker from synapse.storage import DataStore +from synapse.storage.end_to_end_keys import EndToEndKeyStore from synapse.util.caches.stream_change_cache import StreamChangeCache @@ -45,6 +46,7 @@ class SlavedDeviceStore(BaseSlavedStore): _mark_as_sent_devices_by_remote_txn = ( DataStore._mark_as_sent_devices_by_remote_txn.__func__ ) + count_e2e_one_time_keys = EndToEndKeyStore.__dict__["count_e2e_one_time_keys"] def stream_positions(self): result = super(SlavedDeviceStore, self).stream_positions() -- cgit 1.5.1 From dfbda5e0250adfa762f5491c7efb2666866db034 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 25 May 2017 17:08:41 +0100 Subject: Faster cache for get_joined_hosts --- synapse/federation/transaction_queue.py | 2 + synapse/replication/slave/storage/events.py | 2 + synapse/state.py | 16 ++--- synapse/storage/roommember.py | 93 ++++++++++++++++++++++------- synapse/storage/state.py | 33 ++++++++++ 5 files changed, 117 insertions(+), 29 deletions(-) (limited to 'synapse/replication') diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index a15198e05d..4c25ef1106 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -187,6 +187,8 @@ class TransactionQueue(object): prev_id for prev_id, _ in event.prev_events ], ) + destinations = set(destinations) + logger.info("destinations: %r", destinations) if send_on_behalf_of is not None: # If we are sending the event on behalf of another server diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index fcaf58b93b..6cd3a843df 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -108,6 +108,8 @@ class SlavedEventStore(BaseSlavedStore): get_current_state_ids = ( StateStore.__dict__["get_current_state_ids"] ) + get_state_group_delta = DataStore.get_state_group_delta.__func__ + _get_joined_hosts_cache = RoomMemberStore.__dict__["_get_joined_hosts_cache"] has_room_changed_since = DataStore.has_room_changed_since.__func__ get_unread_push_actions_for_user_in_range_for_http = ( diff --git a/synapse/state.py b/synapse/state.py index 3f93f9e27f..95dbe02e50 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -170,9 +170,7 @@ class StateHandler(object): latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id) logger.debug("calling resolve_state_groups from get_current_user_in_room") entry = yield self.resolve_state_groups(room_id, latest_event_ids) - joined_users = yield self.store.get_joined_users_from_state( - room_id, entry.state_id, entry.state - ) + joined_users = yield self.store.get_joined_users_from_state(room_id, entry) defer.returnValue(joined_users) @defer.inlineCallbacks @@ -181,9 +179,9 @@ class StateHandler(object): latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id) logger.debug("calling resolve_state_groups from get_current_hosts_in_room") entry = yield self.resolve_state_groups(room_id, latest_event_ids) - joined_hosts = yield self.store.get_joined_hosts( - room_id, entry.state_id, entry.state - ) + logger.info("State: %r", entry.state_group) + joined_hosts = yield self.store.get_joined_hosts(room_id, entry) + logger.info("returning: %r", joined_hosts) defer.returnValue(joined_hosts) @defer.inlineCallbacks @@ -307,11 +305,13 @@ class StateHandler(object): if len(group_names) == 1: name, state_list = state_groups_ids.items().pop() + prev_group, delta_ids = yield self.store.get_state_group_delta(name) + defer.returnValue(_StateCacheEntry( state=state_list, state_group=name, - prev_group=None, - delta_ids=None, + prev_group=prev_group, + delta_ids=delta_ids, )) with (yield self.resolve_linearizer.queue(group_names)): diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 0829ae5bee..8c4a5f9f2e 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -18,6 +18,7 @@ from twisted.internet import defer from collections import namedtuple from ._base import SQLBaseStore +from synapse.util.async import Linearizer from synapse.util.caches import intern_string from synapse.util.caches.descriptors import cached, cachedInlineCallbacks from synapse.util.stringutils import to_ascii @@ -392,7 +393,8 @@ class RoomMemberStore(SQLBaseStore): context=context, ) - def get_joined_users_from_state(self, room_id, state_group, state_ids): + def get_joined_users_from_state(self, room_id, state_entry): + state_group = state_entry.state_group if not state_group: # If state_group is None it means it has yet to be assigned a # state group, i.e. we need to make sure that calls with a state_group @@ -401,7 +403,7 @@ class RoomMemberStore(SQLBaseStore): state_group = object() return self._get_joined_users_from_context( - room_id, state_group, state_ids, + room_id, state_group, state_entry.state, context=state_entry, ) @cachedInlineCallbacks(num_args=2, cache_context=True, iterable=True, @@ -534,7 +536,8 @@ class RoomMemberStore(SQLBaseStore): defer.returnValue(False) - def get_joined_hosts(self, room_id, state_group, state_ids): + def get_joined_hosts(self, room_id, state_entry): + state_group = state_entry.state_group if not state_group: # If state_group is None it means it has yet to be assigned a # state group, i.e. we need to make sure that calls with a state_group @@ -543,33 +546,21 @@ class RoomMemberStore(SQLBaseStore): state_group = object() return self._get_joined_hosts( - room_id, state_group, state_ids + room_id, state_group, state_entry.state, state_entry=state_entry, ) @cachedInlineCallbacks(num_args=2, max_entries=10000, iterable=True) - def _get_joined_hosts(self, room_id, state_group, current_state_ids): + # @defer.inlineCallbacks + def _get_joined_hosts(self, room_id, state_group, current_state_ids, state_entry): # We don't use `state_group`, its there so that we can cache based # on it. However, its important that its never None, since two current_state's # with a state_group of None are likely to be different. # See bulk_get_push_rules_for_room for how we work around this. assert state_group is not None - joined_hosts = set() - for etype, state_key in current_state_ids: - if etype == EventTypes.Member: - try: - host = get_domain_from_id(state_key) - except: - logger.warn("state_key not user_id: %s", state_key) - continue - - if host in joined_hosts: - continue - - event_id = current_state_ids[(etype, state_key)] - event = yield self.get_event(event_id, allow_none=True) - if event and event.content["membership"] == Membership.JOIN: - joined_hosts.add(intern_string(host)) + cache = self._get_joined_hosts_cache(room_id) + joined_hosts = yield cache.get_destinations(state_entry) + logger.info("returning: %r", joined_hosts) defer.returnValue(joined_hosts) @@ -647,3 +638,63 @@ class RoomMemberStore(SQLBaseStore): yield self._end_background_update(_MEMBERSHIP_PROFILE_UPDATE_NAME) defer.returnValue(result) + + @cached(max_entries=10000, iterable=True) + def _get_joined_hosts_cache(self, room_id): + return _JoinedHostsCache(self, room_id) + + +class _JoinedHostsCache(object): + def __init__(self, store, room_id): + self.store = store + self.room_id = room_id + + self.hosts_to_joined_users = {} + + self.state_group = object() + + self.linearizer = Linearizer("_JoinedHostsCache") + + self._len = 0 + + @defer.inlineCallbacks + def get_destinations(self, state_entry): + if state_entry.state_group == self.state_group: + defer.returnValue(frozenset(self.hosts_to_joined_users)) + + with (yield self.linearizer.queue(())): + if state_entry.state_group == self.state_group: + pass + elif state_entry.prev_group == self.state_group: + for (typ, state_key), event_id in state_entry.delta_ids.iteritems(): + if typ != EventTypes.Member: + continue + + host = intern_string(get_domain_from_id(state_key)) + user_id = state_key + known_joins = self.hosts_to_joined_users.setdefault(host, set()) + + event = yield self.store.get_event(event_id) + if event.membership == Membership.JOIN: + known_joins.add(user_id) + else: + known_joins.discard(user_id) + + if not known_joins: + self.hosts_to_joined_users.pop(host, None) + else: + joined_users = yield self.store.get_joined_users_from_state( + self.room_id, state_entry, + ) + + self.hosts_to_joined_users = {} + for user_id in joined_users: + host = intern_string(get_domain_from_id(user_id)) + self.hosts_to_joined_users.setdefault(host, set()).add(user_id) + + self.state_group = state_entry.state_group + self._len = sum(len(v) for v in self.hosts_to_joined_users.itervalues()) + defer.returnValue(frozenset(self.hosts_to_joined_users)) + + def __len__(self): + return self._len diff --git a/synapse/storage/state.py b/synapse/storage/state.py index a7c3d401d4..01474ff5ff 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -98,6 +98,39 @@ class StateStore(SQLBaseStore): _get_current_state_ids_txn, ) + def get_state_group_delta(self, state_group): + def _get_state_group_delta_txn(txn): + prev_group = self._simple_select_one_onecol_txn( + txn, + table="state_group_edges", + keyvalues={ + "state_group": state_group, + }, + retcol="prev_state_group", + allow_none=True, + ) + + if not prev_group: + return None, None + + delta_ids = self._simple_select_list_txn( + txn, + table="state_groups_state", + keyvalues={ + "state_group": state_group, + }, + retcols=("type", "state_key", "event_id",) + ) + + return prev_group, { + (row["type"], row["state_key"]): row["event_id"] + for row in delta_ids + } + return self.runInteraction( + "get_state_group_delta", + _get_state_group_delta_txn, + ) + @defer.inlineCallbacks def get_state_groups_ids(self, room_id, event_ids): if not event_ids: -- cgit 1.5.1 From 298d83b34053a45beaa5ad1d202b3ca8e3b1bafe Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 9 Jun 2017 11:01:28 +0100 Subject: Fix replication --- synapse/replication/slave/storage/events.py | 1 - 1 file changed, 1 deletion(-) (limited to 'synapse/replication') diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index 6cd3a843df..3f33d473cc 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -154,7 +154,6 @@ class SlavedEventStore(BaseSlavedStore): DataStore.get_room_events_stream_for_rooms.__func__ ) is_host_joined = DataStore.is_host_joined.__func__ - _is_host_joined = RoomMemberStore.__dict__["_is_host_joined"] get_stream_token_for_event = DataStore.get_stream_token_for_event.__func__ _set_before_and_after = staticmethod(DataStore._set_before_and_after) -- cgit 1.5.1 From 2cac7623a5adfa84df6bd57d74d954daba3cc149 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 9 Jun 2017 11:24:41 +0100 Subject: Add missing notifier --- synapse/replication/tcp/resource.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'synapse/replication') diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index 8b2c4c3043..69c46911ec 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -67,6 +67,7 @@ class ReplicationStreamer(object): self.store = hs.get_datastore() self.presence_handler = hs.get_presence_handler() self.clock = hs.get_clock() + self.notifier = hs.get_notifier() # Current connections. self.connections = [] @@ -99,7 +100,7 @@ class ReplicationStreamer(object): if not hs.config.send_federation: self.federation_sender = hs.get_federation_sender() - hs.get_notifier().add_replication_callback(self.on_notifier_poke) + self.notifier.add_replication_callback(self.on_notifier_poke) # Keeps track of whether we are currently checking for updates self.is_looping = False -- cgit 1.5.1 From 80609743440467a69c4bdcd111c60d84f617d6c4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 9 Jun 2017 16:40:52 +0100 Subject: Fix replication --- synapse/replication/slave/storage/events.py | 2 +- synapse/storage/state.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/replication') diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index 6cd3a843df..f0a367aa9b 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -108,7 +108,7 @@ class SlavedEventStore(BaseSlavedStore): get_current_state_ids = ( StateStore.__dict__["get_current_state_ids"] ) - get_state_group_delta = DataStore.get_state_group_delta.__func__ + get_state_group_delta = StateStore.__dict__["get_state_group_delta"] _get_joined_hosts_cache = RoomMemberStore.__dict__["_get_joined_hosts_cache"] has_room_changed_since = DataStore.has_room_changed_since.__func__ diff --git a/synapse/storage/state.py b/synapse/storage/state.py index 24503cd5aa..d1e679719b 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -37,7 +37,7 @@ class _GetStateGroupDelta(namedtuple("_GetStateGroupDelta", ("prev_group", "delt __slots__ = [] def __len__(self): - return len(self.delta_ids) if self.delta_ids else None + return len(self.delta_ids) if self.delta_ids else 0 class StateStore(SQLBaseStore): -- cgit 1.5.1 From d53fe399ebb497f67efc99ebb12d96486503629a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 13 Jun 2017 09:56:18 +0100 Subject: Add cache for is_host_joined --- synapse/replication/slave/storage/events.py | 2 +- synapse/storage/events.py | 5 +++++ synapse/storage/roommember.py | 2 +- 3 files changed, 7 insertions(+), 2 deletions(-) (limited to 'synapse/replication') diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index 7034f48b50..94ebbffc1b 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -153,7 +153,7 @@ class SlavedEventStore(BaseSlavedStore): get_room_events_stream_for_rooms = ( DataStore.get_room_events_stream_for_rooms.__func__ ) - is_host_joined = DataStore.is_host_joined.__func__ + is_host_joined = RoomMemberStore.__dict__["is_host_joined"] get_stream_token_for_event = DataStore.get_stream_token_for_event.__func__ _set_before_and_after = staticmethod(DataStore._set_before_and_after) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index c80d181fc7..72ce84b0b8 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -776,6 +776,11 @@ class EventsStore(SQLBaseStore): txn, self.get_rooms_for_user, (member,) ) + for host in set(get_domain_from_id(u) for u in members_changed): + self._invalidate_cache_and_stream( + txn, self.is_host_joined, (room_id, host) + ) + self._invalidate_cache_and_stream( txn, self.get_users_in_room, (room_id,) ) diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index e38bbd22a3..457ca288d0 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -501,7 +501,7 @@ class RoomMemberStore(SQLBaseStore): defer.returnValue(users_in_room) - @defer.inlineCallbacks + @cachedInlineCallbacks(max_entries=10000) def is_host_joined(self, room_id, host): if '%' in host or '_' in host: raise Exception("Invalid host name") -- cgit 1.5.1 From 6aa5bc86351a617546f0adacfebab3388716be3f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 15 Jun 2017 12:47:05 +0100 Subject: Initial worker impl --- synapse/app/federation_sender.py | 2 +- synapse/app/user_dir.py | 270 +++++++++++++++++++++++++++++++++++++ synapse/config/server.py | 4 + synapse/handlers/user_directory.py | 19 ++- synapse/replication/tcp/streams.py | 22 +++ synapse/storage/events.py | 18 +++ 6 files changed, 328 insertions(+), 7 deletions(-) create mode 100644 synapse/app/user_dir.py (limited to 'synapse/replication') diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index e51a69074d..03327dc47a 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -51,7 +51,7 @@ import sys import logging import gc -logger = logging.getLogger("synapse.app.appservice") +logger = logging.getLogger("synapse.app.federation_sender") class FederationSenderSlaveStore( diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py new file mode 100644 index 0000000000..9d8edaa8e3 --- /dev/null +++ b/synapse/app/user_dir.py @@ -0,0 +1,270 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# Copyright 2017 Vector Creations Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import synapse + +from synapse.server import HomeServer +from synapse.config._base import ConfigError +from synapse.config.logger import setup_logging +from synapse.config.homeserver import HomeServerConfig +from synapse.crypto import context_factory +from synapse.http.site import SynapseSite +from synapse.http.server import JsonResource +from synapse.metrics.resource import MetricsResource, METRICS_PREFIX +from synapse.replication.slave.storage._base import BaseSlavedStore +from synapse.replication.slave.storage.events import SlavedEventStore +from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore +from synapse.replication.slave.storage.registration import SlavedRegistrationStore +from synapse.replication.tcp.client import ReplicationClientHandler +from synapse.rest.client.v2_alpha import user_directory +from synapse.storage.engines import create_engine +from synapse.storage.client_ips import ClientIpStore +from synapse.storage.user_directory import UserDirectoryStore +from synapse.util.httpresourcetree import create_resource_tree +from synapse.util.logcontext import LoggingContext, PreserveLoggingContext, preserve_fn +from synapse.util.manhole import manhole +from synapse.util.rlimit import change_resource_limit +from synapse.util.versionstring import get_version_string +from synapse.util.caches.stream_change_cache import StreamChangeCache + +from synapse import events + +from twisted.internet import reactor +from twisted.web.resource import Resource + +from daemonize import Daemonize + +import sys +import logging +import gc + +logger = logging.getLogger("synapse.app.user_dir") + + +class UserDirectorySlaveStore( + SlavedEventStore, + SlavedApplicationServiceStore, + SlavedRegistrationStore, + UserDirectoryStore, + BaseSlavedStore, + ClientIpStore, # After BaseSlavedStore because the constructor is different +): + def __init__(self, db_conn, hs): + super(UserDirectorySlaveStore, self).__init__(db_conn, hs) + + events_max = self._stream_id_gen.get_current_token() + curr_state_delta_prefill, min_curr_state_delta_id = self._get_cache_dict( + db_conn, "current_state_delta_stream", + entity_column="room_id", + stream_column="stream_id", + max_value=events_max, # As we share the stream id with events token + limit=1000, + ) + self._curr_state_delta_stream_cache = StreamChangeCache( + "_curr_state_delta_stream_cache", min_curr_state_delta_id, + prefilled_cache=curr_state_delta_prefill, + ) + + self._current_state_delta_pos = events_max + + def stream_positions(self): + result = super(UserDirectorySlaveStore, self).stream_positions() + result["current_state_deltas"] = self._current_state_delta_pos + return result + + def process_replication_rows(self, stream_name, token, rows): + if stream_name == "current_state_deltas": + self._current_state_delta_pos = token + for row in rows: + self._curr_state_delta_stream_cache.entity_has_changed( + row.room_id, token + ) + return super(UserDirectorySlaveStore, self).process_replication_rows( + stream_name, token, rows + ) + + +class UserDirectoryServer(HomeServer): + def get_db_conn(self, run_new_connection=True): + # Any param beginning with cp_ is a parameter for adbapi, and should + # not be passed to the database engine. + db_params = { + k: v for k, v in self.db_config.get("args", {}).items() + if not k.startswith("cp_") + } + db_conn = self.database_engine.module.connect(**db_params) + + if run_new_connection: + self.database_engine.on_new_connection(db_conn) + return db_conn + + def setup(self): + logger.info("Setting up.") + self.datastore = UserDirectorySlaveStore(self.get_db_conn(), self) + logger.info("Finished setting up.") + + def _listen_http(self, listener_config): + port = listener_config["port"] + bind_addresses = listener_config["bind_addresses"] + site_tag = listener_config.get("tag", port) + resources = {} + for res in listener_config["resources"]: + for name in res["names"]: + if name == "metrics": + resources[METRICS_PREFIX] = MetricsResource(self) + elif name == "client": + resource = JsonResource(self, canonical_json=False) + user_directory.register_servlets(self, resource) + resources.update({ + "/_matrix/client/r0": resource, + "/_matrix/client/unstable": resource, + "/_matrix/client/v2_alpha": resource, + "/_matrix/client/api/v1": resource, + }) + + root_resource = create_resource_tree(resources, Resource()) + + for address in bind_addresses: + reactor.listenTCP( + port, + SynapseSite( + "synapse.access.http.%s" % (site_tag,), + site_tag, + listener_config, + root_resource, + ), + interface=address + ) + + logger.info("Synapse user_dir now listening on port %d", port) + + def start_listening(self, listeners): + for listener in listeners: + if listener["type"] == "http": + self._listen_http(listener) + elif listener["type"] == "manhole": + bind_addresses = listener["bind_addresses"] + + for address in bind_addresses: + reactor.listenTCP( + listener["port"], + manhole( + username="matrix", + password="rabbithole", + globals={"hs": self}, + ), + interface=address + ) + else: + logger.warn("Unrecognized listener type: %s", listener["type"]) + + self.get_tcp_replication().start_replication(self) + + def build_tcp_replication(self): + return UserDirectoryReplicationHandler(self) + + +class UserDirectoryReplicationHandler(ReplicationClientHandler): + def __init__(self, hs): + super(UserDirectoryReplicationHandler, self).__init__(hs.get_datastore()) + self.user_directory = hs.get_user_directory_handler() + + def on_rdata(self, stream_name, token, rows): + super(UserDirectoryReplicationHandler, self).on_rdata( + stream_name, token, rows + ) + if stream_name == "current_state_deltas": + preserve_fn(self.user_directory.notify_new_event)() + + +def start(config_options): + try: + config = HomeServerConfig.load_config( + "Synapse user directory", config_options + ) + except ConfigError as e: + sys.stderr.write("\n" + e.message + "\n") + sys.exit(1) + + assert config.worker_app == "synapse.app.user_dir" + + setup_logging(config, use_worker_options=True) + + events.USE_FROZEN_DICTS = config.use_frozen_dicts + + database_engine = create_engine(config.database_config) + + if config.update_user_directory: + sys.stderr.write( + "\nThe update_user_directory must be disabled in the main synapse process" + "\nbefore they can be run in a separate worker." + "\nPlease add ``update_user_directory: false`` to the main config" + "\n" + ) + sys.exit(1) + + # Force the pushers to start since they will be disabled in the main config + config.update_user_directory = True + + tls_server_context_factory = context_factory.ServerContextFactory(config) + + ps = UserDirectoryServer( + config.server_name, + db_config=config.database_config, + tls_server_context_factory=tls_server_context_factory, + config=config, + version_string="Synapse/" + get_version_string(synapse), + database_engine=database_engine, + ) + + ps.setup() + ps.start_listening(config.worker_listeners) + + def run(): + # make sure that we run the reactor with the sentinel log context, + # otherwise other PreserveLoggingContext instances will get confused + # and complain when they see the logcontext arbitrarily swapping + # between the sentinel and `run` logcontexts. + with PreserveLoggingContext(): + logger.info("Running") + change_resource_limit(config.soft_file_limit) + if config.gc_thresholds: + gc.set_threshold(*config.gc_thresholds) + reactor.run() + + def start(): + ps.get_datastore().start_profiling() + ps.get_state_handler().start_caching() + + reactor.callWhenRunning(start) + + if config.worker_daemonize: + daemon = Daemonize( + app="synapse-user-dir", + pid=config.worker_pid_file, + action=run, + auto_close_fds=False, + verbose=True, + logger=logger, + ) + daemon.start() + else: + run() + + +if __name__ == '__main__': + with LoggingContext("main"): + start(sys.argv[1:]) diff --git a/synapse/config/server.py b/synapse/config/server.py index 3910b9dc31..28b4e5f50c 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -35,6 +35,10 @@ class ServerConfig(Config): # "disable" federation self.send_federation = config.get("send_federation", True) + # Whether to update the user directory or not. This should be set to + # false only if we are updating the user directory in a worker + self.update_user_directory = config.get("update_user_directory", True) + self.filter_timeline_limit = config.get("filter_timeline_limit", -1) if self.public_baseurl is not None: diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 8928786fd6..d33a20a1f2 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -50,8 +50,7 @@ class UserDirectoyHandler(object): self.clock = hs.get_clock() self.notifier = hs.get_notifier() self.is_mine_id = hs.is_mine_id - - self.notifier.add_replication_callback(self.notify_new_event) + self.update_user_directory = hs.config.update_user_directory # When start up for the first time we need to populate the user_directory. # This is a set of user_id's we've inserted already @@ -67,9 +66,12 @@ class UserDirectoyHandler(object): # Guard to ensure we only process deltas one at a time self._is_processing = False - # We kick this off so that we don't have to wait for a change before - # we start populating the user directory - self.clock.call_later(0, self.notify_new_event) + if self.update_user_directory: + self.notifier.add_replication_callback(self.notify_new_event) + + # We kick this off so that we don't have to wait for a change before + # we start populating the user directory + self.clock.call_later(0, self.notify_new_event) def search_users(self, user_id, search_term, limit): """Searches for users in directory @@ -94,6 +96,9 @@ class UserDirectoyHandler(object): def notify_new_event(self): """Called when there may be more deltas to process """ + if not self.update_user_directory: + return + if self._is_processing: return @@ -324,7 +329,7 @@ class UserDirectoyHandler(object): event_id (str|None): The new event after the state change typ (str): Type of the event """ - logger.debug("Handling change for %s", typ) + logger.debug("Handling change for %s: %s", typ, room_id) if typ == EventTypes.RoomHistoryVisibility: change = yield self._get_key_change( @@ -394,6 +399,8 @@ class UserDirectoyHandler(object): row = yield self.store.get_user_in_public_room(user_id) if not row: yield self.store.add_users_to_public_room(room_id, [user_id]) + else: + logger.debug("Not adding user to public dir, %r", user_id) # Now we update users who share rooms with users. We do this by getting # all the current users in the room and seeing which aren't already diff --git a/synapse/replication/tcp/streams.py b/synapse/replication/tcp/streams.py index 369d5f2428..fbafe12cc2 100644 --- a/synapse/replication/tcp/streams.py +++ b/synapse/replication/tcp/streams.py @@ -112,6 +112,12 @@ AccountDataStreamRow = namedtuple("AccountDataStream", ( "data_type", # str "data", # dict )) +CurrentStateDeltaStreamRow = namedtuple("CurrentStateDeltaStream", ( + "room_id", # str + "type", # str + "state_key", # str + "event_id", # str, optional +)) class Stream(object): @@ -443,6 +449,21 @@ class AccountDataStream(Stream): defer.returnValue(results) +class CurrentStateDeltaStream(Stream): + """Current state for a room was changed + """ + NAME = "current_state_deltas" + ROW_TYPE = CurrentStateDeltaStreamRow + + def __init__(self, hs): + store = hs.get_datastore() + + self.current_token = store.get_max_current_state_delta_stream_id + self.update_function = store.get_all_updated_current_state_deltas + + super(CurrentStateDeltaStream, self).__init__(hs) + + STREAMS_MAP = { stream.NAME: stream for stream in ( @@ -460,5 +481,6 @@ STREAMS_MAP = { FederationStream, TagAccountDataStream, AccountDataStream, + CurrentStateDeltaStream, ) } diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 72ce84b0b8..90041b0da4 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -2284,6 +2284,24 @@ class EventsStore(SQLBaseStore): defer.returnValue((int(res["topological_ordering"]), int(res["stream_ordering"]))) + def get_max_current_state_delta_stream_id(self): + return self._stream_id_gen.get_current_token() + + def get_all_updated_current_state_deltas(self, from_token, to_token, limit): + def get_all_updated_current_state_deltas_txn(txn): + sql = """ + SELECT stream_id, room_id, type, state_key, event_id + FROM current_state_delta_stream + WHERE ? < stream_id AND stream_id <= ? + ORDER BY stream_id ASC LIMIT ? + """ + txn.execute(sql, (from_token, to_token, limit)) + return txn.fetchall() + return self.runInteraction( + "get_all_updated_current_state_deltas", + get_all_updated_current_state_deltas_txn, + ) + AllNewEventsResult = namedtuple("AllNewEventsResult", [ "new_forward_events", "new_backfill_events", -- cgit 1.5.1