From 5b5148b7ec2a2bdfe5c3045ade80b29ee3183abd Mon Sep 17 00:00:00 2001 From: Will Hunt Date: Thu, 11 Aug 2016 11:48:30 +0100 Subject: Synced up synchrotron set_state with PresenceHandler set_state --- synapse/app/synchrotron.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/app') diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 215ccfd522..48bc97636c 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -119,7 +119,7 @@ class SynchrotronPresence(object): reactor.addSystemEventTrigger("before", "shutdown", self._on_shutdown) - def set_state(self, user, state): + def set_state(self, user, state, ignore_status_msg=False): # TODO Hows this supposed to work? pass -- cgit 1.5.1 From 4e1cebd56f9688d49a51929264c095356005f9a3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 12 Aug 2016 15:31:44 +0100 Subject: Make synchrotron accept /events --- synapse/app/synchrotron.py | 36 ++++++++++++++++++++++++++++++++++-- synapse/handlers/__init__.py | 3 --- synapse/handlers/presence.py | 27 +++++++++++++++++++-------- synapse/rest/client/v1/events.py | 9 ++++----- synapse/server.py | 9 +++++++++ 5 files changed, 66 insertions(+), 18 deletions(-) (limited to 'synapse/app') diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 48bc97636c..3dca1c37a0 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -26,6 +26,7 @@ from synapse.http.site import SynapseSite from synapse.http.server import JsonResource from synapse.metrics.resource import MetricsResource, METRICS_PREFIX from synapse.rest.client.v2_alpha import sync +from synapse.rest.client.v1 import events from synapse.replication.slave.storage._base import BaseSlavedStore from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.receipts import SlavedReceiptsStore @@ -89,17 +90,23 @@ class SynchrotronSlavedStore( get_presence_list_accepted = PresenceStore.__dict__[ "get_presence_list_accepted" ] + get_presence_list_observers_accepted = PresenceStore.__dict__[ + "get_presence_list_observers_accepted" + ] + UPDATE_SYNCING_USERS_MS = 10 * 1000 class SynchrotronPresence(object): def __init__(self, hs): + self.is_mine_id = hs.is_mine_id self.http_client = hs.get_simple_http_client() self.store = hs.get_datastore() self.user_to_num_current_syncs = {} self.syncing_users_url = hs.config.worker_replication_url + "/syncing_users" self.clock = hs.get_clock() + self.notifier = hs.get_notifier() active_presence = self.store.take_presence_startup_info() self.user_to_current_state = { @@ -124,6 +131,8 @@ class SynchrotronPresence(object): pass get_states = PresenceHandler.get_states.__func__ + get_state = PresenceHandler.get_state.__func__ + _get_interested_parties = PresenceHandler._get_interested_parties.__func__ current_state_for_users = PresenceHandler.current_state_for_users.__func__ @defer.inlineCallbacks @@ -194,19 +203,39 @@ class SynchrotronPresence(object): self._need_to_send_sync = False yield self._send_syncing_users_now() + @defer.inlineCallbacks + def notify_from_replication(self, states, stream_id): + parties = yield self._get_interested_parties( + states, calculate_remote_hosts=False + ) + room_ids_to_states, users_to_states, _ = parties + + self.notifier.on_new_event( + "presence_key", stream_id, rooms=room_ids_to_states.keys(), + users=users_to_states.keys() + ) + + @defer.inlineCallbacks def process_replication(self, result): stream = result.get("presence", {"rows": []}) + states = [] for row in stream["rows"]: ( position, user_id, state, last_active_ts, last_federation_update_ts, last_user_sync_ts, status_msg, currently_active ) = row - self.user_to_current_state[user_id] = UserPresenceState( + state = UserPresenceState( user_id, state, last_active_ts, last_federation_update_ts, last_user_sync_ts, status_msg, currently_active ) + self.user_to_current_state[user_id] = state + states.append(state) + + if states and "position" in stream: + stream_id = int(stream["position"]) + yield self.notify_from_replication(states, stream_id) class SynchrotronTyping(object): @@ -266,10 +295,12 @@ class SynchrotronServer(HomeServer): elif name == "client": resource = JsonResource(self, canonical_json=False) sync.register_servlets(self, resource) + events.register_servlets(self, resource) resources.update({ "/_matrix/client/r0": resource, "/_matrix/client/unstable": resource, "/_matrix/client/v2_alpha": resource, + "/_matrix/client/api/v1": resource, }) root_resource = create_resource_tree(resources, Resource()) @@ -315,6 +346,7 @@ class SynchrotronServer(HomeServer): def expire_broken_caches(): store.who_forgot_in_room.invalidate_all() store.get_presence_list_accepted.invalidate_all() + store.get_presence_list_observers_accepted.invalidate_all() def notify_from_stream( result, stream_name, stream_key, room=None, user=None @@ -392,7 +424,7 @@ class SynchrotronServer(HomeServer): ) yield store.process_replication(result) typing_handler.process_replication(result) - presence_handler.process_replication(result) + yield presence_handler.process_replication(result) notify(result) except: logger.exception("Error replicating from %r", replication_url) diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py index 1a50a2ec98..63d05f2531 100644 --- a/synapse/handlers/__init__.py +++ b/synapse/handlers/__init__.py @@ -19,7 +19,6 @@ from .room import ( ) from .room_member import RoomMemberHandler from .message import MessageHandler -from .events import EventStreamHandler, EventHandler from .federation import FederationHandler from .profile import ProfileHandler from .directory import DirectoryHandler @@ -53,8 +52,6 @@ class Handlers(object): self.message_handler = MessageHandler(hs) self.room_creation_handler = RoomCreationHandler(hs) self.room_member_handler = RoomMemberHandler(hs) - self.event_stream_handler = EventStreamHandler(hs) - self.event_handler = EventHandler(hs) self.federation_handler = FederationHandler(hs) self.profile_handler = ProfileHandler(hs) self.directory_handler = DirectoryHandler(hs) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 2293b5fdf7..6a1fe76c88 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -503,7 +503,7 @@ class PresenceHandler(object): defer.returnValue(states) @defer.inlineCallbacks - def _get_interested_parties(self, states): + def _get_interested_parties(self, states, calculate_remote_hosts=True): """Given a list of states return which entities (rooms, users, servers) are interested in the given states. @@ -526,14 +526,15 @@ class PresenceHandler(object): users_to_states.setdefault(state.user_id, []).append(state) hosts_to_states = {} - for room_id, states in room_ids_to_states.items(): - local_states = filter(lambda s: self.is_mine_id(s.user_id), states) - if not local_states: - continue + if calculate_remote_hosts: + for room_id, states in room_ids_to_states.items(): + local_states = filter(lambda s: self.is_mine_id(s.user_id), states) + if not local_states: + continue - hosts = yield self.store.get_joined_hosts_for_room(room_id) - for host in hosts: - hosts_to_states.setdefault(host, []).extend(local_states) + hosts = yield self.store.get_joined_hosts_for_room(room_id) + for host in hosts: + hosts_to_states.setdefault(host, []).extend(local_states) for user_id, states in users_to_states.items(): local_states = filter(lambda s: self.is_mine_id(s.user_id), states) @@ -565,6 +566,16 @@ class PresenceHandler(object): self._push_to_remotes(hosts_to_states) + @defer.inlineCallbacks + def notify_for_states(self, state, stream_id): + parties = yield self._get_interested_parties([state]) + room_ids_to_states, users_to_states, hosts_to_states = parties + + self.notifier.on_new_event( + "presence_key", stream_id, rooms=room_ids_to_states.keys(), + users=[UserID.from_string(u) for u in users_to_states.keys()] + ) + def _push_to_remotes(self, hosts_to_states): """Sends state updates to remote servers. diff --git a/synapse/rest/client/v1/events.py b/synapse/rest/client/v1/events.py index 998b115bb9..701b6f549b 100644 --- a/synapse/rest/client/v1/events.py +++ b/synapse/rest/client/v1/events.py @@ -34,7 +34,7 @@ class EventStreamRestServlet(ClientV1RestServlet): def __init__(self, hs): super(EventStreamRestServlet, self).__init__(hs) - self.handlers = hs.get_handlers() + self.event_stream_handler = hs.get_event_stream_handler() @defer.inlineCallbacks def on_GET(self, request): @@ -50,7 +50,6 @@ class EventStreamRestServlet(ClientV1RestServlet): if "room_id" in request.args: room_id = request.args["room_id"][0] - handler = self.handlers.event_stream_handler pagin_config = PaginationConfig.from_request(request) timeout = EventStreamRestServlet.DEFAULT_LONGPOLL_TIME_MS if "timeout" in request.args: @@ -61,7 +60,7 @@ class EventStreamRestServlet(ClientV1RestServlet): as_client_event = "raw" not in request.args - chunk = yield handler.get_stream( + chunk = yield self.event_stream_handler.get_stream( requester.user.to_string(), pagin_config, timeout=timeout, @@ -84,12 +83,12 @@ class EventRestServlet(ClientV1RestServlet): def __init__(self, hs): super(EventRestServlet, self).__init__(hs) self.clock = hs.get_clock() + self.event_handler = hs.get_event_handler() @defer.inlineCallbacks def on_GET(self, request, event_id): requester = yield self.auth.get_user_by_req(request) - handler = self.handlers.event_handler - event = yield handler.get_event(requester.user, event_id) + event = yield self.event_handler.get_event(requester.user, event_id) time_now = self.clock.time_msec() if event: diff --git a/synapse/server.py b/synapse/server.py index 6bb4988309..af3246504b 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -41,6 +41,7 @@ from synapse.handlers.presence import PresenceHandler from synapse.handlers.room import RoomListHandler from synapse.handlers.sync import SyncHandler from synapse.handlers.typing import TypingHandler +from synapse.handlers.events import EventHandler, EventStreamHandler from synapse.http.client import SimpleHttpClient, InsecureInterceptableContextFactory from synapse.http.matrixfederationclient import MatrixFederationHttpClient from synapse.notifier import Notifier @@ -94,6 +95,8 @@ class HomeServer(object): 'auth_handler', 'device_handler', 'e2e_keys_handler', + 'event_handler', + 'event_stream_handler', 'application_service_api', 'application_service_scheduler', 'application_service_handler', @@ -214,6 +217,12 @@ class HomeServer(object): def build_application_service_handler(self): return ApplicationServicesHandler(self) + def build_event_handler(self): + return EventHandler(self) + + def build_event_stream_handler(self): + return EventStreamHandler(self) + def build_event_sources(self): return EventSources(self) -- cgit 1.5.1 From 64e7e1185392972fd85718bfa55248b041f56b82 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 15 Aug 2016 11:16:45 +0100 Subject: Implement cache replication stream --- synapse/app/synchrotron.py | 13 --------- synapse/replication/resource.py | 21 ++++++++++++- synapse/replication/slave/storage/_base.py | 30 ++++++++++++++++++- synapse/storage/__init__.py | 11 +++++-- synapse/storage/_base.py | 47 ++++++++++++++++++++++-------- 5 files changed, 92 insertions(+), 30 deletions(-) (limited to 'synapse/app') diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 3dca1c37a0..207a75d89e 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -338,16 +338,10 @@ class SynchrotronServer(HomeServer): http_client = self.get_simple_http_client() store = self.get_datastore() replication_url = self.config.worker_replication_url - clock = self.get_clock() notifier = self.get_notifier() presence_handler = self.get_presence_handler() typing_handler = self.get_typing_handler() - def expire_broken_caches(): - store.who_forgot_in_room.invalidate_all() - store.get_presence_list_accepted.invalidate_all() - store.get_presence_list_observers_accepted.invalidate_all() - def notify_from_stream( result, stream_name, stream_key, room=None, user=None ): @@ -409,19 +403,12 @@ class SynchrotronServer(HomeServer): result, "typing", "typing_key", room="room_id" ) - next_expire_broken_caches_ms = 0 while True: try: args = store.stream_positions() args.update(typing_handler.stream_positions()) args["timeout"] = 30000 result = yield http_client.get_json(replication_url, args=args) - now_ms = clock.time_msec() - if now_ms > next_expire_broken_caches_ms: - expire_broken_caches() - next_expire_broken_caches_ms = ( - now_ms + store.BROKEN_CACHE_EXPIRY_MS - ) yield store.process_replication(result) typing_handler.process_replication(result) yield presence_handler.process_replication(result) diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py index 8c2d487ff4..84993b33b3 100644 --- a/synapse/replication/resource.py +++ b/synapse/replication/resource.py @@ -41,6 +41,7 @@ STREAM_NAMES = ( ("push_rules",), ("pushers",), ("state",), + ("caches",), ) @@ -70,6 +71,7 @@ class ReplicationResource(Resource): * "backfill": Old events that have been backfilled from other servers. * "push_rules": Per user changes to push rules. * "pushers": Per user changes to their pushers. + * "caches": Cache invalidations. The API takes two additional query parameters: @@ -129,6 +131,7 @@ class ReplicationResource(Resource): push_rules_token, room_stream_token = self.store.get_push_rules_stream_token() pushers_token = self.store.get_pushers_stream_token() state_token = self.store.get_state_stream_token() + caches_token = self.store.get_cache_stream_token() defer.returnValue(_ReplicationToken( room_stream_token, @@ -140,6 +143,7 @@ class ReplicationResource(Resource): push_rules_token, pushers_token, state_token, + caches_token, )) @request_handler() @@ -188,6 +192,7 @@ class ReplicationResource(Resource): yield self.push_rules(writer, current_token, limit, request_streams) yield self.pushers(writer, current_token, limit, request_streams) yield self.state(writer, current_token, limit, request_streams) + yield self.caches(writer, current_token, limit, request_streams) self.streams(writer, current_token, request_streams) logger.info("Replicated %d rows", writer.total) @@ -379,6 +384,20 @@ class ReplicationResource(Resource): "position", "type", "state_key", "event_id" )) + @defer.inlineCallbacks + def caches(self, writer, current_token, limit, request_streams): + current_position = current_token.caches + + caches = request_streams.get("caches") + + if caches is not None: + updated_caches = yield self.store.get_all_updated_caches( + caches, current_position, limit + ) + writer.write_header_and_rows("caches", updated_caches, ( + "position", "cache_func", "keys", "invalidation_ts" + )) + class _Writer(object): """Writes the streams as a JSON object as the response to the request""" @@ -407,7 +426,7 @@ class _Writer(object): class _ReplicationToken(collections.namedtuple("_ReplicationToken", ( "events", "presence", "typing", "receipts", "account_data", "backfill", - "push_rules", "pushers", "state" + "push_rules", "pushers", "state", "caches", ))): __slots__ = [] diff --git a/synapse/replication/slave/storage/_base.py b/synapse/replication/slave/storage/_base.py index 46e43ce1c7..24c9946d6a 100644 --- a/synapse/replication/slave/storage/_base.py +++ b/synapse/replication/slave/storage/_base.py @@ -14,15 +14,43 @@ # limitations under the License. from synapse.storage._base import SQLBaseStore +from synapse.storage.engines import PostgresEngine from twisted.internet import defer +from ._slaved_id_tracker import SlavedIdTracker + +import logging + +logger = logging.getLogger(__name__) + class BaseSlavedStore(SQLBaseStore): def __init__(self, db_conn, hs): super(BaseSlavedStore, self).__init__(hs) + if isinstance(self.database_engine, PostgresEngine): + self._cache_id_gen = SlavedIdTracker( + db_conn, "cache_stream", "stream_id", + ) + else: + self._cache_id_gen = None def stream_positions(self): - return {} + pos = {} + if self._cache_id_gen: + pos["caches"] = self._cache_id_gen.get_current_token() + return pos def process_replication(self, result): + stream = result.get("caches") + if stream: + for row in stream["rows"]: + ( + position, cache_func, keys, invalidation_ts, + ) = row + + try: + getattr(self, cache_func).invalidate(tuple(keys)) + except AttributeError: + logger.warn("Got unexpected cache_func: %r", cache_func) + self._cache_id_gen.advance(int(stream["position"])) return defer.succeed(None) diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index a0c029a2fc..8af492b69f 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -50,6 +50,7 @@ from .openid import OpenIdStore from .client_ips import ClientIpStore from .util.id_generators import IdGenerator, StreamIdGenerator, ChainedIdGenerator +from .engines import PostgresEngine from synapse.api.constants import PresenceState from synapse.util.caches.stream_change_cache import StreamChangeCache @@ -122,9 +123,13 @@ class DataStore(RoomMemberStore, RoomStore, db_conn, "pushers", "id", extra_tables=[("deleted_pushers", "stream_id")], ) - self._cache_id_gen = StreamIdGenerator( - db_conn, "cache_stream", "stream_id", - ) + + if isinstance(self.database_engine, PostgresEngine): + self._cache_id_gen = StreamIdGenerator( + db_conn, "cache_stream", "stream_id", + ) + else: + self._cache_id_gen = None events_max = self._stream_id_gen.get_current_token() event_cache_prefill, min_event_val = self._get_cache_dict( diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 02d9098ddd..e3edc2cde6 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -19,6 +19,7 @@ from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from synapse.util.caches.dictionary_cache import DictionaryCache from synapse.util.caches.descriptors import Cache from synapse.util.caches import intern_dict +from synapse.storage.engines import PostgresEngine import synapse.metrics @@ -864,21 +865,43 @@ class SQLBaseStore(object): def _invalidate_cache_and_stream(self, txn, cache_func, keys): txn.call_after(cache_func.invalidate, keys) - ctx = self._cache_id_gen.get_next() - stream_id = ctx.__enter__() - txn.call_after(ctx.__exit__, None, None, None) + if isinstance(self.database_engine, PostgresEngine): + ctx = self._cache_id_gen.get_next() + stream_id = ctx.__enter__() + txn.call_after(ctx.__exit__, None, None, None) + + self._simple_insert_txn( + txn, + table="cache_stream", + values={ + "stream_id": stream_id, + "cache_func": cache_func.__name__, + "keys": list(keys), + "invalidation_ts": self.clock.time_msec(), + } + ) - self._simple_insert_txn( - txn, - table="cache_stream", - values={ - "stream_id": stream_id, - "cache_func": cache_func.__name__, - "keys": list(keys), - "invalidation_ts": self.clock.time_msec(), - } + def get_all_updated_caches(self, last_id, current_id, limit): + def get_all_updated_caches_txn(txn): + # We purposefully don't bound by the current token, as we want to + # send across cache invalidations as quickly as possible. Cache + # invalidations are idempotent, so duplicates are fine. + sql = ( + "SELECT stream_id, cache_func, keys, invalidation_ts FROM cache_stream" + " WHERE stream_id > ? ORDER BY stream_id ASC LIMIT ?" + ) + txn.execute(sql, (last_id, limit,)) + return txn.fetchall() + return self.runInteraction( + "get_all_updated_caches", get_all_updated_caches_txn ) + def get_cache_stream_token(self): + if self._cache_id_gen: + return self._cache_id_gen.get_current_token() + else: + return 0 + class _RollbackButIsFineException(Exception): """ This exception is used to rollback a transaction without implying -- cgit 1.5.1 From 784a2d4f2c3c90804bf81d85bf23a671d204dc94 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 15 Aug 2016 11:25:48 +0100 Subject: Remove broken cache stuff --- synapse/app/pusher.py | 16 ---------------- synapse/app/synchrotron.py | 5 ----- 2 files changed, 21 deletions(-) (limited to 'synapse/app') diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index c8dde0fcb8..8d755a4b33 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -80,11 +80,6 @@ class PusherSlaveStore( DataStore.get_profile_displayname.__func__ ) - # XXX: This is a bit broken because we don't persist forgotten rooms - # in a way that they can be streamed. This means that we don't have a - # way to invalidate the forgotten rooms cache correctly. - # For now we expire the cache every 10 minutes. - BROKEN_CACHE_EXPIRY_MS = 60 * 60 * 1000 who_forgot_in_room = ( RoomMemberStore.__dict__["who_forgot_in_room"] ) @@ -168,7 +163,6 @@ class PusherServer(HomeServer): store = self.get_datastore() replication_url = self.config.worker_replication_url pusher_pool = self.get_pusherpool() - clock = self.get_clock() def stop_pusher(user_id, app_id, pushkey): key = "%s:%s" % (app_id, pushkey) @@ -220,21 +214,11 @@ class PusherServer(HomeServer): min_stream_id, max_stream_id, affected_room_ids ) - def expire_broken_caches(): - store.who_forgot_in_room.invalidate_all() - - next_expire_broken_caches_ms = 0 while True: try: args = store.stream_positions() args["timeout"] = 30000 result = yield http_client.get_json(replication_url, args=args) - now_ms = clock.time_msec() - if now_ms > next_expire_broken_caches_ms: - expire_broken_caches() - next_expire_broken_caches_ms = ( - now_ms + store.BROKEN_CACHE_EXPIRY_MS - ) yield store.process_replication(result) poke_pushers(result) except: diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 207a75d89e..e3173533e2 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -75,11 +75,6 @@ class SynchrotronSlavedStore( BaseSlavedStore, ClientIpStore, # After BaseSlavedStore because the constructor is different ): - # XXX: This is a bit broken because we don't persist forgotten rooms - # in a way that they can be streamed. This means that we don't have a - # way to invalidate the forgotten rooms cache correctly. - # For now we expire the cache every 10 minutes. - BROKEN_CACHE_EXPIRY_MS = 60 * 60 * 1000 who_forgot_in_room = ( RoomMemberStore.__dict__["who_forgot_in_room"] ) -- cgit 1.5.1 From 396624864a8d85623ff4252196a1ced0c9440364 Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 18 Aug 2016 09:38:42 +0100 Subject: Add a media repository worker --- synapse/app/media_repository.py | 212 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 212 insertions(+) create mode 100644 synapse/app/media_repository.py (limited to 'synapse/app') diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py new file mode 100644 index 0000000000..a22a72ac00 --- /dev/null +++ b/synapse/app/media_repository.py @@ -0,0 +1,212 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import synapse + +from synapse.config._base import ConfigError +from synapse.config.homeserver import HomeServerConfig +from synapse.config.logger import setup_logging +from synapse.http.site import SynapseSite +from synapse.metrics.resource import MetricsResource, METRICS_PREFIX +from synapse.replication.slave.storage._base import BaseSlavedStore +from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore +from synapse.replication.slave.storage.registration import SlavedRegistrationStore +from synapse.rest.media.v0.content_repository import ContentRepoResource +from synapse.rest.media.v1.media_repository import MediaRepositoryResource +from synapse.server import HomeServer +from synapse.storage.client_ips import ClientIpStore +from synapse.storage.engines import create_engine +from synapse.storage.media_repository import MediaRepositoryStore +from synapse.util.async import sleep +from synapse.util.httpresourcetree import create_resource_tree +from synapse.util.logcontext import LoggingContext +from synapse.util.manhole import manhole +from synapse.util.rlimit import change_resource_limit +from synapse.util.versionstring import get_version_string +from synapse.api.urls import ( + CONTENT_REPO_PREFIX, LEGACY_MEDIA_PREFIX, MEDIA_PREFIX +) +from synapse.crypto import context_factory + + +from twisted.internet import reactor, defer +from twisted.web.resource import Resource + +from daemonize import Daemonize + +import sys +import logging +import gc + +logger = logging.getLogger("synapse.app.media_repository") + + +class MediaRepositorySlavedStore( + SlavedApplicationServiceStore, + SlavedRegistrationStore, + BaseSlavedStore, + MediaRepositoryStore, + ClientIpStore, +): + pass + + +class MediaRepositoryServer(HomeServer): + def get_db_conn(self, run_new_connection=True): + # Any param beginning with cp_ is a parameter for adbapi, and should + # not be passed to the database engine. + db_params = { + k: v for k, v in self.db_config.get("args", {}).items() + if not k.startswith("cp_") + } + db_conn = self.database_engine.module.connect(**db_params) + + if run_new_connection: + self.database_engine.on_new_connection(db_conn) + return db_conn + + def setup(self): + logger.info("Setting up.") + self.datastore = MediaRepositorySlavedStore(self.get_db_conn(), self) + logger.info("Finished setting up.") + + def _listen_http(self, listener_config): + port = listener_config["port"] + bind_address = listener_config.get("bind_address", "") + site_tag = listener_config.get("tag", port) + resources = {} + for res in listener_config["resources"]: + for name in res["names"]: + if name == "metrics": + resources[METRICS_PREFIX] = MetricsResource(self) + elif name == "media": + media_repo = MediaRepositoryResource(self) + resources.update({ + MEDIA_PREFIX: media_repo, + LEGACY_MEDIA_PREFIX: media_repo, + CONTENT_REPO_PREFIX: ContentRepoResource( + self, self.config.uploads_path + ), + }) + + root_resource = create_resource_tree(resources, Resource()) + reactor.listenTCP( + port, + SynapseSite( + "synapse.access.http.%s" % (site_tag,), + site_tag, + listener_config, + root_resource, + ), + interface=bind_address + ) + logger.info("Synapse federation reader now listening on port %d", port) + + def start_listening(self, listeners): + for listener in listeners: + if listener["type"] == "http": + self._listen_http(listener) + elif listener["type"] == "manhole": + reactor.listenTCP( + listener["port"], + manhole( + username="matrix", + password="rabbithole", + globals={"hs": self}, + ), + interface=listener.get("bind_address", '127.0.0.1') + ) + else: + logger.warn("Unrecognized listener type: %s", listener["type"]) + + @defer.inlineCallbacks + def replicate(self): + http_client = self.get_simple_http_client() + store = self.get_datastore() + replication_url = self.config.worker_replication_url + + while True: + try: + args = store.stream_positions() + args["timeout"] = 30000 + result = yield http_client.get_json(replication_url, args=args) + yield store.process_replication(result) + except: + logger.exception("Error replicating from %r", replication_url) + yield sleep(5) + + +def start(config_options): + try: + config = HomeServerConfig.load_config( + "Synapse media repository", config_options + ) + except ConfigError as e: + sys.stderr.write("\n" + e.message + "\n") + sys.exit(1) + + assert config.worker_app == "synapse.app.media_repository" + + setup_logging(config.worker_log_config, config.worker_log_file) + + database_engine = create_engine(config.database_config) + + tls_server_context_factory = context_factory.ServerContextFactory(config) + + ss = MediaRepositoryServer( + config.server_name, + db_config=config.database_config, + tls_server_context_factory=tls_server_context_factory, + config=config, + version_string="Synapse/" + get_version_string(synapse), + database_engine=database_engine, + ) + + ss.setup() + ss.get_handlers() + ss.start_listening(config.worker_listeners) + + def run(): + with LoggingContext("run"): + logger.info("Running") + change_resource_limit(config.soft_file_limit) + if config.gc_thresholds: + gc.set_threshold(*config.gc_thresholds) + reactor.run() + + def start(): + ss.get_datastore().start_profiling() + ss.replicate() + + reactor.callWhenRunning(start) + + if config.worker_daemonize: + daemon = Daemonize( + app="synapse-media-repository", + pid=config.worker_pid_file, + action=run, + auto_close_fds=False, + verbose=True, + logger=logger, + ) + daemon.start() + else: + run() + + +if __name__ == '__main__': + with LoggingContext("main"): + start(sys.argv[1:]) -- cgit 1.5.1 From 403ecd8a2cd09ca570418cc14df55fc0e165954f Mon Sep 17 00:00:00 2001 From: Mark Haines Date: Thu, 18 Aug 2016 10:26:15 +0100 Subject: Missed a s/federation reader/media repository/ in a log message --- synapse/app/media_repository.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/app') diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py index a22a72ac00..9d4c4a0750 100644 --- a/synapse/app/media_repository.py +++ b/synapse/app/media_repository.py @@ -113,7 +113,7 @@ class MediaRepositoryServer(HomeServer): ), interface=bind_address ) - logger.info("Synapse federation reader now listening on port %d", port) + logger.info("Synapse media repository now listening on port %d", port) def start_listening(self, listeners): for listener in listeners: -- cgit 1.5.1 From 07229bbdae6081f0c91a60e76de8fa848903b5bd Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 18 Aug 2016 14:59:55 +0100 Subject: Add appservice worker --- synapse/app/appservice.py | 211 ++++++++++++++++++++++ synapse/config/appservice.py | 1 + synapse/handlers/appservice.py | 89 +++++---- synapse/replication/slave/storage/appservice.py | 10 + synapse/replication/slave/storage/registration.py | 3 + synapse/storage/appservice.py | 145 ++++++++------- synapse/storage/registration.py | 33 ++-- 7 files changed, 369 insertions(+), 123 deletions(-) create mode 100644 synapse/app/appservice.py (limited to 'synapse/app') diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py new file mode 100644 index 0000000000..afc3709409 --- /dev/null +++ b/synapse/app/appservice.py @@ -0,0 +1,211 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import synapse + +from synapse.server import HomeServer +from synapse.config._base import ConfigError +from synapse.config.logger import setup_logging +from synapse.config.homeserver import HomeServerConfig +from synapse.http.site import SynapseSite +from synapse.metrics.resource import MetricsResource, METRICS_PREFIX +from synapse.replication.slave.storage.directory import DirectoryStore +from synapse.replication.slave.storage.events import SlavedEventStore +from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore +from synapse.replication.slave.storage.registration import SlavedRegistrationStore +from synapse.storage.engines import create_engine +from synapse.util.async import sleep +from synapse.util.httpresourcetree import create_resource_tree +from synapse.util.logcontext import LoggingContext +from synapse.util.manhole import manhole +from synapse.util.rlimit import change_resource_limit +from synapse.util.versionstring import get_version_string + +from twisted.internet import reactor, defer +from twisted.web.resource import Resource + +from daemonize import Daemonize + +import sys +import logging +import gc + +logger = logging.getLogger("synapse.app.appservice") + + +class AppserviceSlaveStore( + DirectoryStore, SlavedEventStore, SlavedApplicationServiceStore, + SlavedRegistrationStore, +): + pass + + +class AppserviceServer(HomeServer): + def get_db_conn(self, run_new_connection=True): + # Any param beginning with cp_ is a parameter for adbapi, and should + # not be passed to the database engine. + db_params = { + k: v for k, v in self.db_config.get("args", {}).items() + if not k.startswith("cp_") + } + db_conn = self.database_engine.module.connect(**db_params) + + if run_new_connection: + self.database_engine.on_new_connection(db_conn) + return db_conn + + def setup(self): + logger.info("Setting up.") + self.datastore = AppserviceSlaveStore(self.get_db_conn(), self) + logger.info("Finished setting up.") + + def _listen_http(self, listener_config): + port = listener_config["port"] + bind_address = listener_config.get("bind_address", "") + site_tag = listener_config.get("tag", port) + resources = {} + for res in listener_config["resources"]: + for name in res["names"]: + if name == "metrics": + resources[METRICS_PREFIX] = MetricsResource(self) + + root_resource = create_resource_tree(resources, Resource()) + reactor.listenTCP( + port, + SynapseSite( + "synapse.access.http.%s" % (site_tag,), + site_tag, + listener_config, + root_resource, + ), + interface=bind_address + ) + logger.info("Synapse appservice now listening on port %d", port) + + def start_listening(self, listeners): + for listener in listeners: + if listener["type"] == "http": + self._listen_http(listener) + elif listener["type"] == "manhole": + reactor.listenTCP( + listener["port"], + manhole( + username="matrix", + password="rabbithole", + globals={"hs": self}, + ), + interface=listener.get("bind_address", '127.0.0.1') + ) + else: + logger.warn("Unrecognized listener type: %s", listener["type"]) + + @defer.inlineCallbacks + def replicate(self): + http_client = self.get_simple_http_client() + store = self.get_datastore() + replication_url = self.config.worker_replication_url + appservice_handler = self.get_application_service_handler() + + @defer.inlineCallbacks + def replicate(results): + stream = results.get("events") + if stream: + max_stream_id = stream["position"] + yield appservice_handler.notify_interested_services(max_stream_id) + + while True: + try: + logger.info("Hitting replication") + args = store.stream_positions() + args["timeout"] = 30000 + result = yield http_client.get_json(replication_url, args=args) + logger.info("Got replication response") + yield store.process_replication(result) + replicate(result) + except: + logger.exception("Error replicating from %r", replication_url) + yield sleep(30) + + +def start(config_options): + try: + config = HomeServerConfig.load_config( + "Synapse appservice", config_options + ) + except ConfigError as e: + sys.stderr.write("\n" + e.message + "\n") + sys.exit(1) + + assert config.worker_app == "synapse.app.appservice" + + setup_logging(config.worker_log_config, config.worker_log_file) + + database_engine = create_engine(config.database_config) + + if config.notify_appservices: + sys.stderr.write( + "\nThe appservices must be disabled in the main synapse process" + "\nbefore they can be run in a separate worker." + "\nPlease add ``notify_appservices: false`` to the main config" + "\n" + ) + sys.exit(1) + + # Force the pushers to start since they will be disabled in the main config + config.notify_appservices = True + + ps = AppserviceServer( + config.server_name, + db_config=config.database_config, + config=config, + version_string="Synapse/" + get_version_string(synapse), + database_engine=database_engine, + ) + + ps.setup() + ps.start_listening(config.worker_listeners) + + def run(): + with LoggingContext("run"): + logger.info("Running") + change_resource_limit(config.soft_file_limit) + if config.gc_thresholds: + gc.set_threshold(*config.gc_thresholds) + reactor.run() + + def start(): + ps.replicate() + ps.get_datastore().start_profiling() + + reactor.callWhenRunning(start) + + if config.worker_daemonize: + daemon = Daemonize( + app="synapse-appservice", + pid=config.worker_pid_file, + action=run, + auto_close_fds=False, + verbose=True, + logger=logger, + ) + daemon.start() + else: + run() + + +if __name__ == '__main__': + with LoggingContext("main"): + start(sys.argv[1:]) diff --git a/synapse/config/appservice.py b/synapse/config/appservice.py index eade803909..70d28892c6 100644 --- a/synapse/config/appservice.py +++ b/synapse/config/appservice.py @@ -28,6 +28,7 @@ class AppServiceConfig(Config): def read_config(self, config): self.app_service_config_files = config.get("app_service_config_files", []) + self.notify_appservices = config.get("notify_appservices", True) def default_config(cls, **kwargs): return """\ diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 84341b0d20..6556dd1ae8 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -44,6 +44,10 @@ class ApplicationServicesHandler(object): self.scheduler = hs.get_application_service_scheduler() self.started_scheduler = False self.clock = hs.get_clock() + self.notify_appservices = hs.config.notify_appservices + + self.current_max = 0 + self.is_processing = False @defer.inlineCallbacks def notify_interested_services(self, current_id): @@ -56,47 +60,56 @@ class ApplicationServicesHandler(object): current_id(int): The current maximum ID. """ services = yield self.store.get_app_services() - if not services: + if not services or not self.notify_appservices: return - with Measure(self.clock, "notify_interested_services"): - upper_bound = current_id - limit = 100 - while True: - upper_bound, events = yield self.store.get_new_events_for_appservice( - upper_bound, limit - ) - - logger.info("Current_id: %r, upper_bound: %r", current_id, upper_bound) - - if not events: - break - - for event in events: - # Gather interested services - services = yield self._get_services_for_event(event) - if len(services) == 0: - continue # no services need notifying - - # Do we know this user exists? If not, poke the user query API for - # all services which match that user regex. This needs to block as - # these user queries need to be made BEFORE pushing the event. - yield self._check_user_exists(event.sender) - if event.type == EventTypes.Member: - yield self._check_user_exists(event.state_key) - - if not self.started_scheduler: - self.scheduler.start().addErrback(log_failure) - self.started_scheduler = True - - # Fork off pushes to these services - for service in services: - preserve_fn(self.scheduler.submit_event_for_as)(service, event) - - yield self.store.set_appservice_last_pos(upper_bound) + self.current_max = max(self.current_max, current_id) + if self.is_processing: + return - if len(events) < limit: - break + with Measure(self.clock, "notify_interested_services"): + self.is_processing = True + try: + upper_bound = self.current_max + limit = 100 + while True: + upper_bound, events = yield self.store.get_new_events_for_appservice( + upper_bound, limit + ) + + if not events: + break + + for event in events: + # Gather interested services + services = yield self._get_services_for_event(event) + if len(services) == 0: + continue # no services need notifying + + # Do we know this user exists? If not, poke the user + # query API for all services which match that user regex. + # This needs to block as these user queries need to be + # made BEFORE pushing the event. + yield self._check_user_exists(event.sender) + if event.type == EventTypes.Member: + yield self._check_user_exists(event.state_key) + + if not self.started_scheduler: + self.scheduler.start().addErrback(log_failure) + self.started_scheduler = True + + # Fork off pushes to these services + for service in services: + preserve_fn(self.scheduler.submit_event_for_as)( + service, event + ) + + yield self.store.set_appservice_last_pos(upper_bound) + + if len(events) < limit: + break + finally: + self.is_processing = False @defer.inlineCallbacks def query_user_exists(self, user_id): diff --git a/synapse/replication/slave/storage/appservice.py b/synapse/replication/slave/storage/appservice.py index 25792d9429..a374f2f1a2 100644 --- a/synapse/replication/slave/storage/appservice.py +++ b/synapse/replication/slave/storage/appservice.py @@ -28,3 +28,13 @@ class SlavedApplicationServiceStore(BaseSlavedStore): get_app_service_by_token = DataStore.get_app_service_by_token.__func__ get_app_service_by_user_id = DataStore.get_app_service_by_user_id.__func__ + get_app_services = DataStore.get_app_services.__func__ + get_new_events_for_appservice = DataStore.get_new_events_for_appservice.__func__ + create_appservice_txn = DataStore.create_appservice_txn.__func__ + get_appservices_by_state = DataStore.get_appservices_by_state.__func__ + get_oldest_unsent_txn = DataStore.get_oldest_unsent_txn.__func__ + _get_last_txn = DataStore._get_last_txn.__func__ + complete_appservice_txn = DataStore.complete_appservice_txn.__func__ + get_appservice_state = DataStore.get_appservice_state.__func__ + set_appservice_last_pos = DataStore.set_appservice_last_pos.__func__ + set_appservice_state = DataStore.set_appservice_state.__func__ diff --git a/synapse/replication/slave/storage/registration.py b/synapse/replication/slave/storage/registration.py index 38b78b97fc..e27c7332d2 100644 --- a/synapse/replication/slave/storage/registration.py +++ b/synapse/replication/slave/storage/registration.py @@ -28,3 +28,6 @@ class SlavedRegistrationStore(BaseSlavedStore): ] _query_for_auth = DataStore._query_for_auth.__func__ + get_user_by_id = RegistrationStore.__dict__[ + "get_user_by_id" + ] diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py index f0c88e05cd..b496b918b7 100644 --- a/synapse/storage/appservice.py +++ b/synapse/storage/appservice.py @@ -218,38 +218,37 @@ class ApplicationServiceTransactionStore(SQLBaseStore): Returns: AppServiceTransaction: A new transaction. """ - return self.runInteraction( - "create_appservice_txn", - self._create_appservice_txn, - service, events - ) + def _create_appservice_txn(txn): + # work out new txn id (highest txn id for this service += 1) + # The highest id may be the last one sent (in which case it is last_txn) + # or it may be the highest in the txns list (which are waiting to be/are + # being sent) + last_txn_id = self._get_last_txn(txn, service.id) - def _create_appservice_txn(self, txn, service, events): - # work out new txn id (highest txn id for this service += 1) - # The highest id may be the last one sent (in which case it is last_txn) - # or it may be the highest in the txns list (which are waiting to be/are - # being sent) - last_txn_id = self._get_last_txn(txn, service.id) + txn.execute( + "SELECT MAX(txn_id) FROM application_services_txns WHERE as_id=?", + (service.id,) + ) + highest_txn_id = txn.fetchone()[0] + if highest_txn_id is None: + highest_txn_id = 0 - txn.execute( - "SELECT MAX(txn_id) FROM application_services_txns WHERE as_id=?", - (service.id,) - ) - highest_txn_id = txn.fetchone()[0] - if highest_txn_id is None: - highest_txn_id = 0 + new_txn_id = max(highest_txn_id, last_txn_id) + 1 - new_txn_id = max(highest_txn_id, last_txn_id) + 1 + # Insert new txn into txn table + event_ids = json.dumps([e.event_id for e in events]) + txn.execute( + "INSERT INTO application_services_txns(as_id, txn_id, event_ids) " + "VALUES(?,?,?)", + (service.id, new_txn_id, event_ids) + ) + return AppServiceTransaction( + service=service, id=new_txn_id, events=events + ) - # Insert new txn into txn table - event_ids = json.dumps([e.event_id for e in events]) - txn.execute( - "INSERT INTO application_services_txns(as_id, txn_id, event_ids) " - "VALUES(?,?,?)", - (service.id, new_txn_id, event_ids) - ) - return AppServiceTransaction( - service=service, id=new_txn_id, events=events + return self.runInteraction( + "create_appservice_txn", + _create_appservice_txn, ) def complete_appservice_txn(self, txn_id, service): @@ -263,39 +262,38 @@ class ApplicationServiceTransactionStore(SQLBaseStore): A Deferred which resolves if this transaction was stored successfully. """ - return self.runInteraction( - "complete_appservice_txn", - self._complete_appservice_txn, - txn_id, service - ) - - def _complete_appservice_txn(self, txn, txn_id, service): txn_id = int(txn_id) - # Debugging query: Make sure the txn being completed is EXACTLY +1 from - # what was there before. If it isn't, we've got problems (e.g. the AS - # has probably missed some events), so whine loudly but still continue, - # since it shouldn't fail completion of the transaction. - last_txn_id = self._get_last_txn(txn, service.id) - if (last_txn_id + 1) != txn_id: - logger.error( - "appservice: Completing a transaction which has an ID > 1 from " - "the last ID sent to this AS. We've either dropped events or " - "sent it to the AS out of order. FIX ME. last_txn=%s " - "completing_txn=%s service_id=%s", last_txn_id, txn_id, - service.id + def _complete_appservice_txn(txn): + # Debugging query: Make sure the txn being completed is EXACTLY +1 from + # what was there before. If it isn't, we've got problems (e.g. the AS + # has probably missed some events), so whine loudly but still continue, + # since it shouldn't fail completion of the transaction. + last_txn_id = self._get_last_txn(txn, service.id) + if (last_txn_id + 1) != txn_id: + logger.error( + "appservice: Completing a transaction which has an ID > 1 from " + "the last ID sent to this AS. We've either dropped events or " + "sent it to the AS out of order. FIX ME. last_txn=%s " + "completing_txn=%s service_id=%s", last_txn_id, txn_id, + service.id + ) + + # Set current txn_id for AS to 'txn_id' + self._simple_upsert_txn( + txn, "application_services_state", dict(as_id=service.id), + dict(last_txn=txn_id) ) - # Set current txn_id for AS to 'txn_id' - self._simple_upsert_txn( - txn, "application_services_state", dict(as_id=service.id), - dict(last_txn=txn_id) - ) + # Delete txn + self._simple_delete_txn( + txn, "application_services_txns", + dict(txn_id=txn_id, as_id=service.id) + ) - # Delete txn - self._simple_delete_txn( - txn, "application_services_txns", - dict(txn_id=txn_id, as_id=service.id) + return self.runInteraction( + "complete_appservice_txn", + _complete_appservice_txn, ) @defer.inlineCallbacks @@ -309,10 +307,25 @@ class ApplicationServiceTransactionStore(SQLBaseStore): A Deferred which resolves to an AppServiceTransaction or None. """ + def _get_oldest_unsent_txn(txn): + # Monotonically increasing txn ids, so just select the smallest + # one in the txns table (we delete them when they are sent) + txn.execute( + "SELECT * FROM application_services_txns WHERE as_id=?" + " ORDER BY txn_id ASC LIMIT 1", + (service.id,) + ) + rows = self.cursor_to_dict(txn) + if not rows: + return None + + entry = rows[0] + + return entry + entry = yield self.runInteraction( "get_oldest_unsent_appservice_txn", - self._get_oldest_unsent_txn, - service + _get_oldest_unsent_txn, ) if not entry: @@ -326,22 +339,6 @@ class ApplicationServiceTransactionStore(SQLBaseStore): service=service, id=entry["txn_id"], events=events )) - def _get_oldest_unsent_txn(self, txn, service): - # Monotonically increasing txn ids, so just select the smallest - # one in the txns table (we delete them when they are sent) - txn.execute( - "SELECT * FROM application_services_txns WHERE as_id=?" - " ORDER BY txn_id ASC LIMIT 1", - (service.id,) - ) - rows = self.cursor_to_dict(txn) - if not rows: - return None - - entry = rows[0] - - return entry - def _get_last_txn(self, txn, service_id): txn.execute( "SELECT last_txn FROM application_services_state WHERE as_id=?", diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 19cb3b31c6..e404fa72de 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -93,7 +93,6 @@ class RegistrationStore(background_updates.BackgroundUpdateStore): desc="add_refresh_token_to_user", ) - @defer.inlineCallbacks def register(self, user_id, token=None, password_hash=None, was_guest=False, make_guest=False, appservice_id=None, create_profile_with_localpart=None, admin=False): @@ -115,7 +114,7 @@ class RegistrationStore(background_updates.BackgroundUpdateStore): Raises: StoreError if the user_id could not be registered. """ - yield self.runInteraction( + return self.runInteraction( "register", self._register, user_id, @@ -127,8 +126,6 @@ class RegistrationStore(background_updates.BackgroundUpdateStore): create_profile_with_localpart, admin ) - self.get_user_by_id.invalidate((user_id,)) - self.is_guest.invalidate((user_id,)) def _register( self, @@ -210,6 +207,11 @@ class RegistrationStore(background_updates.BackgroundUpdateStore): (create_profile_with_localpart,) ) + self._invalidate_cache_and_stream( + txn, self.get_user_by_id, (user_id,) + ) + txn.call_after(self.is_guest.invalidate, (user_id,)) + @cached() def get_user_by_id(self, user_id): return self._simple_select_one( @@ -236,19 +238,28 @@ class RegistrationStore(background_updates.BackgroundUpdateStore): return self.runInteraction("get_users_by_id_case_insensitive", f) - @defer.inlineCallbacks def user_set_password_hash(self, user_id, password_hash): """ NB. This does *not* evict any cache because the one use for this removes most of the entries subsequently anyway so it would be pointless. Use flush_user separately. """ - yield self._simple_update_one('users', { - 'name': user_id - }, { - 'password_hash': password_hash - }) - self.get_user_by_id.invalidate((user_id,)) + def user_set_password_hash_txn(txn): + self._simple_update_one_txn( + txn, + 'users', { + 'name': user_id + }, + { + 'password_hash': password_hash + } + ) + self._invalidate_cache_and_stream( + txn, self.get_user_by_id, (user_id,) + ) + return self.runInteraction( + "user_set_password_hash", user_set_password_hash_txn + ) @defer.inlineCallbacks def user_delete_access_tokens(self, user_id, except_token_id=None, -- cgit 1.5.1 From bcbd74dc5b7d1918dd3d4460ad1a8dcfba455f07 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 18 Aug 2016 15:52:10 +0100 Subject: Remove log lines --- synapse/app/appservice.py | 2 -- 1 file changed, 2 deletions(-) (limited to 'synapse/app') diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py index afc3709409..57587aed25 100644 --- a/synapse/app/appservice.py +++ b/synapse/app/appservice.py @@ -128,11 +128,9 @@ class AppserviceServer(HomeServer): while True: try: - logger.info("Hitting replication") args = store.stream_positions() args["timeout"] = 30000 result = yield http_client.get_json(replication_url, args=args) - logger.info("Got replication response") yield store.process_replication(result) replicate(result) except: -- cgit 1.5.1