From ed787cf09edd77e39ad9da0b957359214de85287 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 16 Nov 2016 17:34:44 +0000 Subject: Hook up the send queue and create a federation sender worker --- synapse/replication/resource.py | 24 +++++++++++++++++++++++ synapse/replication/slave/storage/deviceinbox.py | 5 +++++ synapse/replication/slave/storage/transactions.py | 3 +++ 3 files changed, 32 insertions(+) (limited to 'synapse/replication') diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py index 5a14c51d23..a77312ae34 100644 --- a/synapse/replication/resource.py +++ b/synapse/replication/resource.py @@ -44,6 +44,7 @@ STREAM_NAMES = ( ("caches",), ("to_device",), ("public_rooms",), + ("federation",), ) @@ -116,8 +117,10 @@ class ReplicationResource(Resource): self.sources = hs.get_event_sources() self.presence_handler = hs.get_presence_handler() self.typing_handler = hs.get_typing_handler() + self.federation_sender = hs.get_federation_sender() self.notifier = hs.notifier self.clock = hs.get_clock() + self.config = hs.get_config() self.putChild("remove_pushers", PusherResource(hs)) self.putChild("syncing_users", PresenceResource(hs)) @@ -134,6 +137,7 @@ class ReplicationResource(Resource): pushers_token = self.store.get_pushers_stream_token() caches_token = self.store.get_cache_stream_token() public_rooms_token = self.store.get_current_public_room_stream_id() + federation_token = self.federation_sender.get_current_token() defer.returnValue(_ReplicationToken( room_stream_token, @@ -148,6 +152,7 @@ class ReplicationResource(Resource): caches_token, int(stream_token.to_device_key), int(public_rooms_token), + int(federation_token), )) @request_handler() @@ -202,6 +207,7 @@ class ReplicationResource(Resource): yield self.caches(writer, current_token, limit, request_streams) yield self.to_device(writer, current_token, limit, request_streams) yield self.public_rooms(writer, current_token, limit, request_streams) + self.federation(writer, current_token, limit, request_streams) self.streams(writer, current_token, request_streams) logger.debug("Replicated %d rows", writer.total) @@ -465,6 +471,23 @@ class ReplicationResource(Resource): "position", "room_id", "visibility" ), position=upto_token) + def federation(self, writer, current_token, limit, request_streams): + if self.config.send_federation: + return + + current_position = current_token.federation + + federation = request_streams.get("federation") + + if federation is not None and federation != current_position: + federation_rows = self.federation_sender.get_replication_rows( + federation, limit, + ) + upto_token = _position_from_rows(federation_rows, current_position) + writer.write_header_and_rows("federation", federation_rows, ( + "position", "type", "content", + ), position=upto_token) + class _Writer(object): """Writes the streams as a JSON object as the response to the request""" @@ -497,6 +520,7 @@ class _Writer(object): class _ReplicationToken(collections.namedtuple("_ReplicationToken", ( "events", "presence", "typing", "receipts", "account_data", "backfill", "push_rules", "pushers", "state", "caches", "to_device", "public_rooms", + "federation", ))): __slots__ = [] diff --git a/synapse/replication/slave/storage/deviceinbox.py b/synapse/replication/slave/storage/deviceinbox.py index 3bfd5e8213..373212d42d 100644 --- a/synapse/replication/slave/storage/deviceinbox.py +++ b/synapse/replication/slave/storage/deviceinbox.py @@ -29,9 +29,14 @@ class SlavedDeviceInboxStore(BaseSlavedStore): "DeviceInboxStreamChangeCache", self._device_inbox_id_gen.get_current_token() ) + self._device_federation_outbox_stream_cache = StreamChangeCache( + "DeviceFederationOutboxStreamChangeCache", + self._device_inbox_id_gen.get_current_token() + ) get_to_device_stream_token = DataStore.get_to_device_stream_token.__func__ get_new_messages_for_device = DataStore.get_new_messages_for_device.__func__ + get_new_device_msgs_for_remote = DataStore.get_new_device_msgs_for_remote.__func__ delete_messages_for_device = DataStore.delete_messages_for_device.__func__ def stream_positions(self): diff --git a/synapse/replication/slave/storage/transactions.py b/synapse/replication/slave/storage/transactions.py index 6f2ba98af5..c459301b76 100644 --- a/synapse/replication/slave/storage/transactions.py +++ b/synapse/replication/slave/storage/transactions.py @@ -25,6 +25,9 @@ class TransactionStore(BaseSlavedStore): ].orig _get_destination_retry_timings = DataStore._get_destination_retry_timings.__func__ + def prep_send_transaction(self, transaction_id, destination, origin_server_ts): + return [] + # For now, don't record the destination rety timings def set_destination_retry_timings(*args, **kwargs): return defer.succeed(None) -- cgit 1.4.1 From f8ee66250a16cb9dd3af01fb1150ff18cfebbc39 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 17 Nov 2016 15:46:44 +0000 Subject: Handle sending events and device messages over federation --- synapse/app/federation_sender.py | 31 ++++++++++-------- synapse/federation/send_queue.py | 38 ++++++++++++++++++---- synapse/federation/transaction_queue.py | 32 ++++++++++++++++++ synapse/handlers/message.py | 13 +------- synapse/notifier.py | 2 ++ synapse/replication/resource.py | 2 +- synapse/replication/slave/storage/deviceinbox.py | 15 ++++++--- synapse/replication/slave/storage/events.py | 11 +++++++ synapse/replication/slave/storage/transactions.py | 4 +-- synapse/storage/deviceinbox.py | 26 ++++++++------- synapse/storage/prepare_database.py | 2 +- .../delta/39/device_federation_stream_idx.sql | 16 +++++++++ synapse/storage/stream.py | 31 ++++++++++++++++++ synapse/util/jsonobject.py | 17 ++++++++-- 14 files changed, 185 insertions(+), 55 deletions(-) create mode 100644 synapse/storage/schema/delta/39/device_federation_stream_idx.sql (limited to 'synapse/replication') diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index 7a4fec4a66..32113c175c 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -127,13 +127,6 @@ class FederationSenderServer(HomeServer): replication_url = self.config.worker_replication_url send_handler = self._get_send_handler() - def replicate(results): - stream = results.get("events") - if stream: - # max_stream_id = stream["position"] - # TODO - pass - while True: try: args = store.stream_positions() @@ -142,7 +135,6 @@ class FederationSenderServer(HomeServer): result = yield http_client.get_json(replication_url, args=args) yield store.process_replication(result) send_handler.process_replication(result) - replicate(result) except: logger.exception("Error replicating from %r", replication_url) yield sleep(30) @@ -242,16 +234,17 @@ class FederationSenderHandler(object): return {"federation": self._latest_room_serial} def process_replication(self, result): - stream = result.get("federation") - if stream: - self._latest_room_serial = int(stream["position"]) + fed_stream = result.get("federation") + if fed_stream: + self._latest_room_serial = int(fed_stream["position"]) presence_to_send = {} keyed_edus = {} edus = {} failures = {} + device_destinations = set() - for row in stream["rows"]: + for row in fed_stream["rows"]: position, typ, content_js = row content = json.loads(content_js) @@ -264,7 +257,9 @@ class FederationSenderHandler(object): key = content["key"] edu = Edu(**content["edu"]) - keyed_edus.setdefault(edu.destination, {})[key] = edu + keyed_edus.setdefault( + edu.destination, {} + )[(edu.destination, tuple(key))] = edu elif typ == send_queue.EDU_TYPE: edu = Edu(**content) @@ -274,6 +269,8 @@ class FederationSenderHandler(object): failure = content["failure"] failures.setdefault(destination, []).append(failure) + elif typ == send_queue.DEVICE_MESSAGE_TYPE: + device_destinations.add(content["destination"]) else: raise Exception("Unrecognised federation type: %r", typ) @@ -296,6 +293,14 @@ class FederationSenderHandler(object): for failure in failure_list: self.federation_sender.send_failure(destination, failure) + for destination in device_destinations: + self.federation_sender.send_device_messages(destination) + + event_stream = result.get("events") + if event_stream: + latest_pos = event_stream["position"] + self.federation_sender.notify_new_events(latest_pos) + if __name__ == '__main__': with LoggingContext("main"): diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index d439be050a..3fc625c4dd 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -23,11 +23,13 @@ PRESENCE_TYPE = "p" KEYED_EDU_TYPE = "k" EDU_TYPE = "e" FAILURE_TYPE = "f" +DEVICE_MESSAGE_TYPE = "d" class FederationRemoteSendQueue(object): def __init__(self, hs): + self.server_name = hs.hostname self.clock = hs.get_clock() # TODO: Add metrics for size of lists below @@ -45,6 +47,8 @@ class FederationRemoteSendQueue(object): self.pos = 1 self.pos_time = sorteddict() + self.device_messages = sorteddict() + self.clock.looping_call(self._clear_queue, 30 * 1000) def _next_pos(self): @@ -111,6 +115,15 @@ class FederationRemoteSendQueue(object): for key in keys[:i]: del self.failures[key] + # Delete things out of device map + keys = self.device_messages.keys() + i = keys.bisect_left(position_to_delete) + for key in keys[:i]: + del self.device_messages[key] + + def notify_new_events(self, current_id): + pass + def send_edu(self, destination, edu_type, content, key=None): pos = self._next_pos() @@ -122,6 +135,7 @@ class FederationRemoteSendQueue(object): ) if key: + assert isinstance(key, tuple) self.keyed_edu[(destination, key)] = edu self.keyed_edu_changed[pos] = (destination, key) else: @@ -148,9 +162,9 @@ class FederationRemoteSendQueue(object): # This gets sent down a separate path pass - def notify_new_device_message(self, destination): - # TODO - pass + def send_device_messages(self, destination): + pos = self._next_pos() + self.device_messages[pos] = destination def get_current_token(self): return self.pos - 1 @@ -188,11 +202,11 @@ class FederationRemoteSendQueue(object): i = keys.bisect_right(token) keyed_edus = set((k, self.keyed_edu_changed[k]) for k in keys[i:]) - for (pos, edu_key) in keyed_edus: + for (pos, (destination, edu_key)) in keyed_edus: rows.append( (pos, KEYED_EDU_TYPE, ujson.dumps({ "key": edu_key, - "edu": self.keyed_edu[edu_key].get_dict(), + "edu": self.keyed_edu[(destination, edu_key)].get_internal_dict(), })) ) @@ -202,7 +216,7 @@ class FederationRemoteSendQueue(object): edus = set((k, self.edus[k]) for k in keys[i:]) for (pos, edu) in edus: - rows.append((pos, EDU_TYPE, ujson.dumps(edu.get_dict()))) + rows.append((pos, EDU_TYPE, ujson.dumps(edu.get_internal_dict()))) # Fetch changed failures keys = self.failures.keys() @@ -210,11 +224,21 @@ class FederationRemoteSendQueue(object): failures = set((k, self.failures[k]) for k in keys[i:]) for (pos, (destination, failure)) in failures: - rows.append((pos, None, FAILURE_TYPE, ujson.dumps({ + rows.append((pos, FAILURE_TYPE, ujson.dumps({ "destination": destination, "failure": failure, }))) + # Fetch changed device messages + keys = self.device_messages.keys() + i = keys.bisect_right(token) + device_messages = set((k, self.device_messages[k]) for k in keys[i:]) + + for (pos, destination) in device_messages: + rows.append((pos, DEVICE_MESSAGE_TYPE, ujson.dumps({ + "destination": destination, + }))) + # Sort rows based on pos rows.sort() diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 5d4f244377..aa664beead 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -26,6 +26,7 @@ from synapse.util.retryutils import ( get_retry_limiter, NotRetryingDestination, ) from synapse.util.metrics import measure_func +from synapse.types import get_domain_from_id from synapse.handlers.presence import format_user_presence_state import synapse.metrics @@ -54,6 +55,7 @@ class TransactionQueue(object): self.server_name = hs.hostname self.store = hs.get_datastore() + self.state = hs.get_state_handler() self.transaction_actions = TransactionActions(self.store) self.transport_layer = hs.get_federation_transport_client() @@ -103,6 +105,9 @@ class TransactionQueue(object): self._order = 1 + self._is_processing = False + self._last_token = 0 + def can_send_to(self, destination): """Can we send messages to the given server? @@ -123,6 +128,33 @@ class TransactionQueue(object): else: return not destination.startswith("localhost") + @defer.inlineCallbacks + def notify_new_events(self, current_id): + if self._is_processing: + return + + try: + self._is_processing = True + while True: + self._last_token, events = yield self.store.get_all_new_events_stream( + self._last_token, current_id, limit=20, + ) + + if not events: + break + + for event in events: + users_in_room = yield self.state.get_current_user_in_room( + event.room_id, latest_event_ids=[event.event_id], + ) + + destinations = [ + get_domain_from_id(user_id) for user_id in users_in_room + ] + self.send_pdu(event, destinations) + finally: + self._is_processing = False + def send_pdu(self, pdu, destinations): # We loop through all destinations to see whether we already have # a transaction in progress. If we do, stick it in the pending_pdus diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 81df45177a..fd09397226 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -22,7 +22,7 @@ from synapse.events.utils import serialize_event from synapse.events.validator import EventValidator from synapse.push.action_generator import ActionGenerator from synapse.types import ( - UserID, RoomAlias, RoomStreamToken, get_domain_from_id + UserID, RoomAlias, RoomStreamToken, ) from synapse.util.async import run_on_reactor, ReadWriteLock from synapse.util.logcontext import preserve_fn @@ -599,13 +599,6 @@ class MessageHandler(BaseHandler): event_stream_id, max_stream_id ) - users_in_room = yield self.store.get_joined_users_from_context(event, context) - - destinations = [ - get_domain_from_id(user_id) for user_id in users_in_room - if not self.hs.is_mine_id(user_id) - ] - @defer.inlineCallbacks def _notify(): yield run_on_reactor() @@ -618,7 +611,3 @@ class MessageHandler(BaseHandler): # If invite, remove room_state from unsigned before sending. event.unsigned.pop("invite_room_state", None) - - preserve_fn(federation_handler.handle_new_event)( - event, destinations=destinations, - ) diff --git a/synapse/notifier.py b/synapse/notifier.py index 48653ae843..d528d1c1e0 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -143,6 +143,7 @@ class Notifier(object): self.clock = hs.get_clock() self.appservice_handler = hs.get_application_service_handler() + self.federation_sender = hs.get_federation_sender() self.state_handler = hs.get_state_handler() self.clock.looping_call( @@ -219,6 +220,7 @@ class Notifier(object): """Notify any user streams that are interested in this room event""" # poke any interested application service. self.appservice_handler.notify_interested_services(room_stream_id) + self.federation_sender.notify_new_events(room_stream_id) if event.type == EventTypes.Member and event.membership == Membership.JOIN: self._user_joined_room(event.state_key, event.room_id) diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py index a77312ae34..e708811326 100644 --- a/synapse/replication/resource.py +++ b/synapse/replication/resource.py @@ -453,7 +453,7 @@ class ReplicationResource(Resource): ) upto_token = _position_from_rows(to_device_rows, current_position) writer.write_header_and_rows("to_device", to_device_rows, ( - "position", "user_id", "device_id", "message_json" + "position", "entity", ), position=upto_token) @defer.inlineCallbacks diff --git a/synapse/replication/slave/storage/deviceinbox.py b/synapse/replication/slave/storage/deviceinbox.py index 373212d42d..cc860f9f9b 100644 --- a/synapse/replication/slave/storage/deviceinbox.py +++ b/synapse/replication/slave/storage/deviceinbox.py @@ -38,6 +38,7 @@ class SlavedDeviceInboxStore(BaseSlavedStore): get_new_messages_for_device = DataStore.get_new_messages_for_device.__func__ get_new_device_msgs_for_remote = DataStore.get_new_device_msgs_for_remote.__func__ delete_messages_for_device = DataStore.delete_messages_for_device.__func__ + delete_device_msgs_for_remote = DataStore.delete_device_msgs_for_remote.__func__ def stream_positions(self): result = super(SlavedDeviceInboxStore, self).stream_positions() @@ -50,9 +51,15 @@ class SlavedDeviceInboxStore(BaseSlavedStore): self._device_inbox_id_gen.advance(int(stream["position"])) for row in stream["rows"]: stream_id = row[0] - user_id = row[1] - self._device_inbox_stream_cache.entity_has_changed( - user_id, stream_id - ) + entity = row[1] + + if entity.startswith("@"): + self._device_inbox_stream_cache.entity_has_changed( + entity, stream_id + ) + else: + self._device_federation_outbox_stream_cache.entity_has_changed( + entity, stream_id + ) return super(SlavedDeviceInboxStore, self).process_replication(result) diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index 0c26e96e98..ef8713b55d 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -26,6 +26,11 @@ from synapse.storage.stream import StreamStore from synapse.util.caches.stream_change_cache import StreamChangeCache import ujson as json +import logging + + +logger = logging.getLogger(__name__) + # So, um, we want to borrow a load of functions intended for reading from # a DataStore, but we don't want to take functions that either write to the @@ -180,6 +185,8 @@ class SlavedEventStore(BaseSlavedStore): EventFederationStore.__dict__["_get_forward_extremeties_for_room"] ) + get_all_new_events_stream = DataStore.get_all_new_events_stream.__func__ + def stream_positions(self): result = super(SlavedEventStore, self).stream_positions() result["events"] = self._stream_id_gen.get_current_token() @@ -194,6 +201,10 @@ class SlavedEventStore(BaseSlavedStore): stream = result.get("events") if stream: self._stream_id_gen.advance(int(stream["position"])) + + if stream["rows"]: + logger.info("Got %d event rows", len(stream["rows"])) + for row in stream["rows"]: self._process_replication_row( row, backfilled=False, state_resets=state_resets diff --git a/synapse/replication/slave/storage/transactions.py b/synapse/replication/slave/storage/transactions.py index c459301b76..d92cea4ab1 100644 --- a/synapse/replication/slave/storage/transactions.py +++ b/synapse/replication/slave/storage/transactions.py @@ -25,8 +25,8 @@ class TransactionStore(BaseSlavedStore): ].orig _get_destination_retry_timings = DataStore._get_destination_retry_timings.__func__ - def prep_send_transaction(self, transaction_id, destination, origin_server_ts): - return [] + prep_send_transaction = DataStore.prep_send_transaction.__func__ + delivered_txn = DataStore.delivered_txn.__func__ # For now, don't record the destination rety timings def set_destination_retry_timings(*args, **kwargs): diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py index f640e73714..87398d60bc 100644 --- a/synapse/storage/deviceinbox.py +++ b/synapse/storage/deviceinbox.py @@ -269,27 +269,29 @@ class DeviceInboxStore(SQLBaseStore): return defer.succeed([]) def get_all_new_device_messages_txn(txn): + # We limit like this as we might have multiple rows per stream_id, and + # we want to make sure we always get all entries for any stream_id + # we return. + upper_pos = min(current_pos, last_pos + limit) sql = ( - "SELECT stream_id FROM device_inbox" + "SELECT stream_id, user_id" + " FROM device_inbox" " WHERE ? < stream_id AND stream_id <= ?" - " GROUP BY stream_id" " ORDER BY stream_id ASC" - " LIMIT ?" ) - txn.execute(sql, (last_pos, current_pos, limit)) - stream_ids = txn.fetchall() - if not stream_ids: - return [] - max_stream_id_in_limit = stream_ids[-1] + txn.execute(sql, (last_pos, upper_pos)) + rows = txn.fetchall() sql = ( - "SELECT stream_id, user_id, device_id, message_json" - " FROM device_inbox" + "SELECT stream_id, destination" + " FROM device_federation_outbox" " WHERE ? < stream_id AND stream_id <= ?" " ORDER BY stream_id ASC" ) - txn.execute(sql, (last_pos, max_stream_id_in_limit)) - return txn.fetchall() + txn.execute(sql, (last_pos, upper_pos)) + rows.extend(txn.fetchall()) + + return rows return self.runInteraction( "get_all_new_device_messages", get_all_new_device_messages_txn diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index 6576a30098..e46ae6502e 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -25,7 +25,7 @@ logger = logging.getLogger(__name__) # Remember to update this number every time a change is made to database # schema files, so the users will be informed on server restarts. -SCHEMA_VERSION = 38 +SCHEMA_VERSION = 39 dir_path = os.path.abspath(os.path.dirname(__file__)) diff --git a/synapse/storage/schema/delta/39/device_federation_stream_idx.sql b/synapse/storage/schema/delta/39/device_federation_stream_idx.sql new file mode 100644 index 0000000000..00be801e90 --- /dev/null +++ b/synapse/storage/schema/delta/39/device_federation_stream_idx.sql @@ -0,0 +1,16 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE INDEX device_federation_outbox_id ON device_federation_outbox(stream_id); diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 888b1cb35d..f34cb78f9a 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -765,3 +765,34 @@ class StreamStore(SQLBaseStore): "token": end_token, }, } + + @defer.inlineCallbacks + def get_all_new_events_stream(self, from_id, current_id, limit): + """Get all new events""" + + def get_all_new_events_stream_txn(txn): + sql = ( + "SELECT e.stream_ordering, e.event_id" + " FROM events AS e" + " WHERE" + " ? < e.stream_ordering AND e.stream_ordering <= ?" + " ORDER BY e.stream_ordering ASC" + " LIMIT ?" + ) + + txn.execute(sql, (from_id, current_id, limit)) + rows = txn.fetchall() + + upper_bound = current_id + if len(rows) == limit: + upper_bound = rows[-1][0] + + return upper_bound, [row[1] for row in rows] + + upper_bound, event_ids = yield self.runInteraction( + "get_all_new_events_stream", get_all_new_events_stream_txn, + ) + + events = yield self._get_events(event_ids) + + defer.returnValue((upper_bound, events)) diff --git a/synapse/util/jsonobject.py b/synapse/util/jsonobject.py index 3fd5c3d9fd..d668e5a6b8 100644 --- a/synapse/util/jsonobject.py +++ b/synapse/util/jsonobject.py @@ -76,15 +76,26 @@ class JsonEncodedObject(object): d.update(self.unrecognized_keys) return d + def get_internal_dict(self): + d = { + k: _encode(v, internal=True) for (k, v) in self.__dict__.items() + if k in self.valid_keys + } + d.update(self.unrecognized_keys) + return d + def __str__(self): return "(%s, %s)" % (self.__class__.__name__, repr(self.__dict__)) -def _encode(obj): +def _encode(obj, internal=False): if type(obj) is list: - return [_encode(o) for o in obj] + return [_encode(o, internal=internal) for o in obj] if isinstance(obj, JsonEncodedObject): - return obj.get_dict() + if internal: + return obj.get_internal_dict() + else: + return obj.get_dict() return obj -- cgit 1.4.1 From 7c9cdb22453d1a442e5c280149aeeff4d46da215 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 21 Nov 2016 11:28:37 +0000 Subject: Store federation stream positions in the database --- synapse/app/federation_sender.py | 38 +++++++++++++--------- synapse/federation/transaction_queue.py | 21 +++++++++--- synapse/replication/slave/storage/events.py | 3 ++ synapse/storage/_base.py | 18 +++++++--- .../schema/delta/39/federation_out_position.sql | 22 +++++++++++++ synapse/storage/stream.py | 16 +++++++++ 6 files changed, 94 insertions(+), 24 deletions(-) create mode 100644 synapse/storage/schema/delta/39/federation_out_position.sql (limited to 'synapse/replication') diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index 32113c175c..6678667c35 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -125,27 +125,22 @@ class FederationSenderServer(HomeServer): http_client = self.get_simple_http_client() store = self.get_datastore() replication_url = self.config.worker_replication_url - send_handler = self._get_send_handler() + send_handler = FederationSenderHandler(self) + + send_handler.on_start() while True: try: args = store.stream_positions() - args.update(send_handler.stream_positions()) + args.update((yield send_handler.stream_positions())) args["timeout"] = 30000 result = yield http_client.get_json(replication_url, args=args) yield store.process_replication(result) - send_handler.process_replication(result) + yield send_handler.process_replication(result) except: logger.exception("Error replicating from %r", replication_url) yield sleep(30) - def _get_send_handler(self): - try: - return self._send_handler - except AttributeError: - self._send_handler = FederationSenderHandler(self) - return self._send_handler - def start(config_options): try: @@ -221,22 +216,29 @@ def start(config_options): class FederationSenderHandler(object): def __init__(self, hs): + self.store = hs.get_datastore() self.federation_sender = hs.get_federation_sender() - self._latest_room_serial = -1 self._room_serials = {} self._room_typing = {} + def on_start(self): + # There may be some events that are persisted but haven't been sent, + # so send them now. + self.federation_sender.notify_new_events( + self.store.get_room_max_stream_ordering() + ) + + @defer.inlineCallbacks def stream_positions(self): - # We must update this token from the response of the previous - # sync. In particular, the stream id may "reset" back to zero/a low - # value which we *must* use for the next replication request. - return {"federation": self._latest_room_serial} + stream_id = yield self.store.get_federation_out_pos("federation") + defer.returnValue({"federation": stream_id}) + @defer.inlineCallbacks def process_replication(self, result): fed_stream = result.get("federation") if fed_stream: - self._latest_room_serial = int(fed_stream["position"]) + latest_id = int(fed_stream["position"]) presence_to_send = {} keyed_edus = {} @@ -296,6 +298,10 @@ class FederationSenderHandler(object): for destination in device_destinations: self.federation_sender.send_device_messages(destination) + yield self.store.update_federation_out_pos( + "federation", latest_id + ) + event_stream = result.get("events") if event_stream: latest_pos = event_stream["position"] diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index aa664beead..1b0ea070c2 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -106,7 +106,7 @@ class TransactionQueue(object): self._order = 1 self._is_processing = False - self._last_token = 0 + self._last_poked_id = -1 def can_send_to(self, destination): """Can we send messages to the given server? @@ -130,17 +130,22 @@ class TransactionQueue(object): @defer.inlineCallbacks def notify_new_events(self, current_id): + self._last_poked_id = max(current_id, self._last_poked_id) + if self._is_processing: return try: self._is_processing = True while True: - self._last_token, events = yield self.store.get_all_new_events_stream( - self._last_token, current_id, limit=20, + last_token = yield self.store.get_federation_out_pos("events") + next_token, events = yield self.store.get_all_new_events_stream( + last_token, self._last_poked_id, limit=20, ) - if not events: + logger.debug("Handling %s -> %s", last_token, next_token) + + if not events and next_token >= self._last_poked_id: break for event in events: @@ -151,7 +156,15 @@ class TransactionQueue(object): destinations = [ get_domain_from_id(user_id) for user_id in users_in_room ] + + logger.debug("Sending %s to %r", event, destinations) + self.send_pdu(event, destinations) + + yield self.store.update_federation_out_pos( + "events", next_token + ) + finally: self._is_processing = False diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index ef8713b55d..64f18bbb3e 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -187,6 +187,9 @@ class SlavedEventStore(BaseSlavedStore): get_all_new_events_stream = DataStore.get_all_new_events_stream.__func__ + get_federation_out_pos = DataStore.get_federation_out_pos.__func__ + update_federation_out_pos = DataStore.update_federation_out_pos.__func__ + def stream_positions(self): result = super(SlavedEventStore, self).stream_positions() result["events"] = self._stream_id_gen.get_current_token() diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index d828d6ee1d..d3686b9690 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -561,12 +561,17 @@ class SQLBaseStore(object): @staticmethod def _simple_select_onecol_txn(txn, table, keyvalues, retcol): + if keyvalues: + where = " WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.keys()) + else: + where = "" + sql = ( - "SELECT %(retcol)s FROM %(table)s WHERE %(where)s" + "SELECT %(retcol)s FROM %(table)s %(where)s" ) % { "retcol": retcol, "table": table, - "where": " AND ".join("%s = ?" % k for k in keyvalues.keys()), + "where": where, } txn.execute(sql, keyvalues.values()) @@ -744,10 +749,15 @@ class SQLBaseStore(object): @staticmethod def _simple_update_one_txn(txn, table, keyvalues, updatevalues): - update_sql = "UPDATE %s SET %s WHERE %s" % ( + if keyvalues: + where = " WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.keys()) + else: + where = "" + + update_sql = "UPDATE %s SET %s %s" % ( table, ", ".join("%s = ?" % (k,) for k in updatevalues), - " AND ".join("%s = ?" % (k,) for k in keyvalues) + where, ) txn.execute( diff --git a/synapse/storage/schema/delta/39/federation_out_position.sql b/synapse/storage/schema/delta/39/federation_out_position.sql new file mode 100644 index 0000000000..edbd8e132f --- /dev/null +++ b/synapse/storage/schema/delta/39/federation_out_position.sql @@ -0,0 +1,22 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + CREATE TABLE federation_stream_position( + type TEXT NOT NULL, + stream_id INTEGER NOT NULL + ); + + INSERT INTO federation_stream_position (type, stream_id) VALUES ('federation', -1); + INSERT INTO federation_stream_position (type, stream_id) VALUES ('events', -1); diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index f34cb78f9a..7fa63b58a7 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -796,3 +796,19 @@ class StreamStore(SQLBaseStore): events = yield self._get_events(event_ids) defer.returnValue((upper_bound, events)) + + def get_federation_out_pos(self, typ): + return self._simple_select_one_onecol( + table="federation_stream_position", + retcol="stream_id", + keyvalues={"type": typ}, + desc="get_federation_out_pos" + ) + + def update_federation_out_pos(self, typ, stream_id): + return self._simple_update_one( + table="federation_stream_position", + keyvalues={"type": typ}, + updatevalues={"stream_id": stream_id}, + desc="update_federation_out_pos", + ) -- cgit 1.4.1 From 90565d015e97a494f516cc6f06596ca5c6d490ec Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 22 Nov 2016 17:45:44 +0000 Subject: Invalidate retry cache in both directions --- synapse/replication/expire_cache.py | 60 +++++++++++++++++++++++ synapse/replication/resource.py | 2 + synapse/replication/slave/storage/_base.py | 19 +++++++ synapse/replication/slave/storage/transactions.py | 9 ++-- synapse/storage/transactions.py | 48 +++++++++++++----- synapse/util/retryutils.py | 21 ++++---- 6 files changed, 132 insertions(+), 27 deletions(-) create mode 100644 synapse/replication/expire_cache.py (limited to 'synapse/replication') diff --git a/synapse/replication/expire_cache.py b/synapse/replication/expire_cache.py new file mode 100644 index 0000000000..c05a50d7a6 --- /dev/null +++ b/synapse/replication/expire_cache.py @@ -0,0 +1,60 @@ +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.http.server import respond_with_json_bytes, request_handler +from synapse.http.servlet import parse_json_object_from_request + +from twisted.web.resource import Resource +from twisted.web.server import NOT_DONE_YET + + +class ExpireCacheResource(Resource): + """ + HTTP endpoint for expiring storage caches. + + POST /_synapse/replication/expire_cache HTTP/1.1 + Content-Type: application/json + + { + "invalidate": [ + { + "name": "func_name", + "keys": ["key1", "key2"] + } + ] + } + """ + + def __init__(self, hs): + Resource.__init__(self) # Resource is old-style, so no super() + + self.store = hs.get_datastore() + self.version_string = hs.version_string + self.clock = hs.get_clock() + + def render_POST(self, request): + self._async_render_POST(request) + return NOT_DONE_YET + + @request_handler() + def _async_render_POST(self, request): + content = parse_json_object_from_request(request) + + for row in content["invalidate"]: + name = row["name"] + keys = tuple(row["keys"]) + + getattr(self.store, name).invalidate(keys) + + respond_with_json_bytes(request, 200, "{}") diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py index e708811326..b05ca62710 100644 --- a/synapse/replication/resource.py +++ b/synapse/replication/resource.py @@ -17,6 +17,7 @@ from synapse.http.servlet import parse_integer, parse_string from synapse.http.server import request_handler, finish_request from synapse.replication.pusher_resource import PusherResource from synapse.replication.presence_resource import PresenceResource +from synapse.replication.expire_cache import ExpireCacheResource from synapse.api.errors import SynapseError from twisted.web.resource import Resource @@ -124,6 +125,7 @@ class ReplicationResource(Resource): self.putChild("remove_pushers", PusherResource(hs)) self.putChild("syncing_users", PresenceResource(hs)) + self.putChild("expire_cache", ExpireCacheResource(hs)) def render_GET(self, request): self._async_render_GET(request) diff --git a/synapse/replication/slave/storage/_base.py b/synapse/replication/slave/storage/_base.py index f19540d6bb..18076e0f3b 100644 --- a/synapse/replication/slave/storage/_base.py +++ b/synapse/replication/slave/storage/_base.py @@ -34,6 +34,9 @@ class BaseSlavedStore(SQLBaseStore): else: self._cache_id_gen = None + self.expire_cache_url = hs.config.worker_replication_url + "/expire_cache" + self.http_client = hs.get_simple_http_client() + def stream_positions(self): pos = {} if self._cache_id_gen: @@ -54,3 +57,19 @@ class BaseSlavedStore(SQLBaseStore): logger.info("Got unexpected cache_func: %r", cache_func) self._cache_id_gen.advance(int(stream["position"])) return defer.succeed(None) + + def _invalidate_cache_and_stream(self, txn, cache_func, keys): + txn.call_after(cache_func.invalidate, keys) + txn.call_after(self._send_invalidation_poke, cache_func, keys) + + @defer.inlineCallbacks + def _send_invalidation_poke(self, cache_func, keys): + try: + yield self.http_client.post_json_get_json(self.expire_cache_url, { + "invalidate": [{ + "name": cache_func.__name__, + "keys": list(keys), + }] + }) + except: + logger.exception("Failed to poke on expire_cache") diff --git a/synapse/replication/slave/storage/transactions.py b/synapse/replication/slave/storage/transactions.py index d92cea4ab1..fbb58f35da 100644 --- a/synapse/replication/slave/storage/transactions.py +++ b/synapse/replication/slave/storage/transactions.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from twisted.internet import defer from ._base import BaseSlavedStore from synapse.storage import DataStore from synapse.storage.transactions import TransactionStore @@ -22,12 +21,10 @@ from synapse.storage.transactions import TransactionStore class TransactionStore(BaseSlavedStore): get_destination_retry_timings = TransactionStore.__dict__[ "get_destination_retry_timings" - ].orig + ] _get_destination_retry_timings = DataStore._get_destination_retry_timings.__func__ + set_destination_retry_timings = DataStore.set_destination_retry_timings.__func__ + _set_destination_retry_timings = DataStore._set_destination_retry_timings.__func__ prep_send_transaction = DataStore.prep_send_transaction.__func__ delivered_txn = DataStore.delivered_txn.__func__ - - # For now, don't record the destination rety timings - def set_destination_retry_timings(*args, **kwargs): - return defer.succeed(None) diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index adab520c78..ee2efb0d36 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -14,6 +14,7 @@ # limitations under the License. from ._base import SQLBaseStore +from synapse.storage.engines import PostgresEngine from synapse.util.caches.descriptors import cached from twisted.internet import defer @@ -200,25 +201,48 @@ class TransactionStore(SQLBaseStore): def _set_destination_retry_timings(self, txn, destination, retry_last_ts, retry_interval): - txn.call_after(self.get_destination_retry_timings.invalidate, (destination,)) + self.database_engine.lock_table(txn, "destinations") - self._simple_upsert_txn( + self._invalidate_cache_and_stream( + txn, self.get_destination_retry_timings, (destination,) + ) + + # We need to be careful here as the data may have changed from under us + # due to a worker setting the timings. + + prev_row = self._simple_select_one_txn( txn, - "destinations", + table="destinations", keyvalues={ "destination": destination, }, - values={ - "retry_last_ts": retry_last_ts, - "retry_interval": retry_interval, - }, - insertion_values={ - "destination": destination, - "retry_last_ts": retry_last_ts, - "retry_interval": retry_interval, - } + retcols=("retry_last_ts", "retry_interval"), + allow_none=True, ) + if not prev_row: + self._simple_insert_txn( + txn, + table="destinations", + values={ + "destination": destination, + "retry_last_ts": retry_last_ts, + "retry_interval": retry_interval, + } + ) + elif retry_interval == 0 or prev_row["retry_interval"] < retry_interval: + self._simple_update_one_txn( + txn, + "destinations", + keyvalues={ + "destination": destination, + }, + updatevalues={ + "retry_last_ts": retry_last_ts, + "retry_interval": retry_interval, + }, + ) + def get_destinations_needing_retry(self): """Get all destinations which are due a retry for sending a transaction. diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py index 49527f4d21..46ef5a8ec7 100644 --- a/synapse/util/retryutils.py +++ b/synapse/util/retryutils.py @@ -121,12 +121,6 @@ class RetryDestinationLimiter(object): pass def __exit__(self, exc_type, exc_val, exc_tb): - def err(failure): - logger.exception( - "Failed to store set_destination_retry_timings", - failure.value - ) - valid_err_code = False if exc_type is not None and issubclass(exc_type, CodeMessageException): valid_err_code = 0 <= exc_val.code < 500 @@ -151,6 +145,15 @@ class RetryDestinationLimiter(object): retry_last_ts = int(self.clock.time_msec()) - self.store.set_destination_retry_timings( - self.destination, retry_last_ts, self.retry_interval - ).addErrback(err) + @defer.inlineCallbacks + def store_retry_timings(): + try: + yield self.store.set_destination_retry_timings( + self.destination, retry_last_ts, self.retry_interval + ) + except: + logger.exception( + "Failed to store set_destination_retry_timings", + ) + + store_retry_timings() -- cgit 1.4.1 From 4c79a63fd76e982e5e60b22c7efd15b6e3cf9915 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 23 Nov 2016 10:40:44 +0000 Subject: Explicit federation ack --- synapse/app/federation_sender.py | 5 ++++- synapse/federation/send_queue.py | 13 +++++++++++-- synapse/replication/resource.py | 15 ++++++++++----- 3 files changed, 25 insertions(+), 8 deletions(-) (limited to 'synapse/replication') diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index ba2b4c2615..dcdbe79a17 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -235,7 +235,10 @@ class FederationSenderHandler(object): @defer.inlineCallbacks def stream_positions(self): stream_id = yield self.store.get_federation_out_pos("federation") - defer.returnValue({"federation": stream_id}) + defer.returnValue({ + "federation": stream_id, + "federation_ack": stream_id, + }) @defer.inlineCallbacks def process_replication(self, result): diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index ed2b03fad4..5c9f7a86f0 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -213,7 +213,15 @@ class FederationRemoteSendQueue(object): def get_current_token(self): return self.pos - 1 - def get_replication_rows(self, token, limit): + def get_replication_rows(self, token, limit, federation_ack=None): + """ + Args: + token (int) + limit (int) + federation_ack (int): Optional. The position where the worker is + explicitly acknowledged it has handled. Allows us to drop + data from before that point + """ # TODO: Handle limit. # To handle restarts where we wrap around @@ -224,7 +232,8 @@ class FederationRemoteSendQueue(object): # There should be only one reader, so lets delete everything its # acknowledged its seen. - self._clear_queue_before_pos(token) + if federation_ack: + self._clear_queue_before_pos(federation_ack) # Fetch changed presence keys = self.presence_changed.keys() diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py index b05ca62710..cb9697e378 100644 --- a/synapse/replication/resource.py +++ b/synapse/replication/resource.py @@ -171,8 +171,13 @@ class ReplicationResource(Resource): } request_streams["streams"] = parse_string(request, "streams") + federation_ack = parse_integer(request, "federation_ack", None) + def replicate(): - return self.replicate(request_streams, limit) + return self.replicate( + request_streams, limit, + federation_ack=federation_ack + ) writer = yield self.notifier.wait_for_replication(replicate, timeout) result = writer.finish() @@ -190,7 +195,7 @@ class ReplicationResource(Resource): finish_request(request) @defer.inlineCallbacks - def replicate(self, request_streams, limit): + def replicate(self, request_streams, limit, federation_ack=None): writer = _Writer() current_token = yield self.current_replication_token() logger.debug("Replicating up to %r", current_token) @@ -209,7 +214,7 @@ class ReplicationResource(Resource): yield self.caches(writer, current_token, limit, request_streams) yield self.to_device(writer, current_token, limit, request_streams) yield self.public_rooms(writer, current_token, limit, request_streams) - self.federation(writer, current_token, limit, request_streams) + self.federation(writer, current_token, limit, request_streams, federation_ack) self.streams(writer, current_token, request_streams) logger.debug("Replicated %d rows", writer.total) @@ -473,7 +478,7 @@ class ReplicationResource(Resource): "position", "room_id", "visibility" ), position=upto_token) - def federation(self, writer, current_token, limit, request_streams): + def federation(self, writer, current_token, limit, request_streams, federation_ack): if self.config.send_federation: return @@ -483,7 +488,7 @@ class ReplicationResource(Resource): if federation is not None and federation != current_position: federation_rows = self.federation_sender.get_replication_rows( - federation, limit, + federation, limit, federation_ack=federation_ack, ) upto_token = _position_from_rows(federation_rows, current_position) writer.write_header_and_rows("federation", federation_rows, ( -- cgit 1.4.1 From 26072df6af7ca37b8e6e5f340a00e695de5c93d5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 23 Nov 2016 14:09:47 +0000 Subject: Ensure only main or federation_sender process can send federation traffic --- synapse/notifier.py | 11 +++++++++-- synapse/replication/resource.py | 2 +- synapse/server.py | 13 +++++++++++-- 3 files changed, 21 insertions(+), 5 deletions(-) (limited to 'synapse/replication') diff --git a/synapse/notifier.py b/synapse/notifier.py index d528d1c1e0..054ca59ad2 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -143,7 +143,12 @@ class Notifier(object): self.clock = hs.get_clock() self.appservice_handler = hs.get_application_service_handler() - self.federation_sender = hs.get_federation_sender() + + if hs.should_send_federation(): + self.federation_sender = hs.get_federation_sender() + else: + self.federation_sender = None + self.state_handler = hs.get_state_handler() self.clock.looping_call( @@ -220,7 +225,9 @@ class Notifier(object): """Notify any user streams that are interested in this room event""" # poke any interested application service. self.appservice_handler.notify_interested_services(room_stream_id) - self.federation_sender.notify_new_events(room_stream_id) + + if self.federation_sender: + self.federation_sender.notify_new_events(room_stream_id) if event.type == EventTypes.Member and event.membership == Membership.JOIN: self._user_joined_room(event.state_key, event.room_id) diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py index cb9697e378..d79b421cba 100644 --- a/synapse/replication/resource.py +++ b/synapse/replication/resource.py @@ -460,7 +460,7 @@ class ReplicationResource(Resource): ) upto_token = _position_from_rows(to_device_rows, current_position) writer.write_header_and_rows("to_device", to_device_rows, ( - "position", "entity", + "position", "user_id", "device_id", "message_json" ), position=upto_token) @defer.inlineCallbacks diff --git a/synapse/server.py b/synapse/server.py index 6c57ab3e18..ef75ab434c 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -274,14 +274,23 @@ class HomeServer(object): return TransportLayerClient(self) def build_federation_sender(self): - if self.config.send_federation: + if self.should_send_federation(): return TransactionQueue(self) - else: + elif not self.config.worker_app: return FederationRemoteSendQueue(self) + else: + raise Exception("Workers cannot send federation traffic") def remove_pusher(self, app_id, push_key, user_id): return self.get_pusherpool().remove_pusher(app_id, push_key, user_id) + def should_send_federation(self): + "Should this server be sending federation traffic directly?" + return self.config.send_federation and ( + not self.config.worker_app + or self.config.worker_app == "synapse.app.federation_sender" + ) + def _make_dependency_method(depname): def _get(hs): -- cgit 1.4.1 From f32fb6555246adec537bfbca1ddca19e8fe5f4ba Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 6 Dec 2016 10:43:48 +0000 Subject: Add new API appservice specific public room list --- synapse/appservice/__init__.py | 3 + synapse/appservice/api.py | 9 ++ synapse/federation/federation_client.py | 7 +- synapse/federation/transport/client.py | 9 +- synapse/federation/transport/server.py | 19 ++- synapse/handlers/directory.py | 12 ++ synapse/handlers/room_list.py | 57 +++++-- synapse/http/servlet.py | 8 +- synapse/replication/resource.py | 2 +- synapse/rest/client/v1/directory.py | 34 ++++ synapse/rest/client/v1/room.py | 19 ++- synapse/storage/room.py | 171 ++++++++++++++++++--- .../schema/delta/39/appservice_room_list.sql | 27 ++++ synapse/types.py | 34 ++++ 14 files changed, 371 insertions(+), 40 deletions(-) create mode 100644 synapse/storage/schema/delta/39/appservice_room_list.sql (limited to 'synapse/replication') diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py index 91471f7e89..b0106a3597 100644 --- a/synapse/appservice/__init__.py +++ b/synapse/appservice/__init__.py @@ -89,6 +89,9 @@ class ApplicationService(object): self.namespaces = self._check_namespaces(namespaces) self.id = id + if "|" in self.id: + raise Exception("application service ID cannot contain '|' character") + # .protocols is a publicly visible field if protocols: self.protocols = set(protocols) diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py index b0eb0c6d9d..17dfc8e98c 100644 --- a/synapse/appservice/api.py +++ b/synapse/appservice/api.py @@ -19,6 +19,7 @@ from synapse.api.errors import CodeMessageException from synapse.http.client import SimpleHttpClient from synapse.events.utils import serialize_event from synapse.util.caches.response_cache import ResponseCache +from synapse.types import ThirdPartyInstanceID import logging import urllib @@ -177,6 +178,14 @@ class ApplicationServiceApi(SimpleHttpClient): " valid result", uri) defer.returnValue(None) + for instance in info.get("instances", []): + instance["appservice_id"] = service.id + network_id = instance.get("network_id", None) + if network_id is not None: + instance["network_id"] = ThirdPartyInstanceID( + service.id, network_id, + ).to_string() + defer.returnValue(info) except Exception as ex: logger.warning("query_3pe_protocol to %s threw exception %s", diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index b255709165..6e23c207ee 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -655,12 +655,15 @@ class FederationClient(FederationBase): raise RuntimeError("Failed to send to any server.") def get_public_rooms(self, destination, limit=None, since_token=None, - search_filter=None): + search_filter=None, include_all_networks=False, + third_party_instance_id=None): if destination == self.server_name: return return self.transport_layer.get_public_rooms( - destination, limit, since_token, search_filter + destination, limit, since_token, search_filter, + include_all_networks=include_all_networks, + third_party_instance_id=third_party_instance_id, ) @defer.inlineCallbacks diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index db45c7826c..491cdc29e1 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -249,10 +249,15 @@ class TransportLayerClient(object): @defer.inlineCallbacks @log_function def get_public_rooms(self, remote_server, limit, since_token, - search_filter=None): + search_filter=None, include_all_networks=False, + third_party_instance_id=None): path = PREFIX + "/publicRooms" - args = {} + args = { + "include_all_networks": "true" if include_all_networks else "false", + } + if third_party_instance_id: + args["third_party_instance_id"] = third_party_instance_id, if limit: args["limit"] = [str(limit)] if since_token: diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index fec337be64..159dbd1747 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -20,9 +20,11 @@ from synapse.api.errors import Codes, SynapseError from synapse.http.server import JsonResource from synapse.http.servlet import ( parse_json_object_from_request, parse_integer_from_args, parse_string_from_args, + parse_boolean_from_args, ) from synapse.util.ratelimitutils import FederationRateLimiter from synapse.util.versionstring import get_version_string +from synapse.types import ThirdPartyInstanceID import functools import logging @@ -558,8 +560,23 @@ class PublicRoomList(BaseFederationServlet): def on_GET(self, origin, content, query): limit = parse_integer_from_args(query, "limit", 0) since_token = parse_string_from_args(query, "since", None) + include_all_networks = parse_boolean_from_args( + query, "include_all_networks", False + ) + third_party_instance_id = parse_string_from_args( + query, "third_party_instance_id", None + ) + + if include_all_networks: + network_tuple = None + elif third_party_instance_id: + network_tuple = ThirdPartyInstanceID.from_string(third_party_instance_id) + else: + network_tuple = ThirdPartyInstanceID(None, None) + data = yield self.room_list_handler.get_local_public_room_list( - limit, since_token + limit, since_token, + network_tuple=network_tuple ) defer.returnValue((200, data)) diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py index c00274afc3..08ed513ef8 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py @@ -339,3 +339,15 @@ class DirectoryHandler(BaseHandler): yield self.auth.check_can_change_room_list(room_id, requester.user) yield self.store.set_room_is_public(room_id, visibility == "public") + + @defer.inlineCallbacks + def edit_published_appservice_room_list(self, appservice_id, network_id, + room_id, visibility): + """Edit the appservice/network specific public room list. + """ + if visibility not in ["public", "private"]: + raise SynapseError(400, "Invalid visibility setting") + + yield self.store.set_room_is_public_appservice( + room_id, appservice_id, network_id, visibility == "public" + ) diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index b04aea0110..712e5641d5 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -22,6 +22,7 @@ from synapse.api.constants import ( ) from synapse.util.async import concurrently_execute from synapse.util.caches.response_cache import ResponseCache +from synapse.types import ThirdPartyInstanceID from collections import namedtuple from unpaddedbase64 import encode_base64, decode_base64 @@ -34,6 +35,10 @@ logger = logging.getLogger(__name__) REMOTE_ROOM_LIST_POLL_INTERVAL = 60 * 1000 +# This is used to indicate we should only return rooms published to the main list. +EMTPY_THIRD_PARTY_ID = ThirdPartyInstanceID(None, None) + + class RoomListHandler(BaseHandler): def __init__(self, hs): super(RoomListHandler, self).__init__(hs) @@ -41,10 +46,27 @@ class RoomListHandler(BaseHandler): self.remote_response_cache = ResponseCache(hs, timeout_ms=30 * 1000) def get_local_public_room_list(self, limit=None, since_token=None, - search_filter=None): - if search_filter: + search_filter=None, + network_tuple=EMTPY_THIRD_PARTY_ID,): + """Generate a local public room list. + + There are multiple different lists: the main one plus one per third + party network. A client can ask for a specific list or to return all. + + Args: + limit (int) + since_token (str) + search_filter (dict) + network_tuple (ThirdPartyInstanceID): Which public list to use. + This can be (None, None) to indicate the main list, or a particular + appservice and network id to use an appservice specific one. + Setting to None returns all public rooms across all lists. + """ + if search_filter or network_tuple is not (None, None): # We explicitly don't bother caching searches. - return self._get_public_room_list(limit, since_token, search_filter) + return self._get_public_room_list( + limit, since_token, search_filter, network_tuple=network_tuple, + ) result = self.response_cache.get((limit, since_token)) if not result: @@ -56,7 +78,8 @@ class RoomListHandler(BaseHandler): @defer.inlineCallbacks def _get_public_room_list(self, limit=None, since_token=None, - search_filter=None): + search_filter=None, + network_tuple=EMTPY_THIRD_PARTY_ID,): if since_token and since_token != "END": since_token = RoomListNextBatch.from_token(since_token) else: @@ -73,14 +96,15 @@ class RoomListHandler(BaseHandler): current_public_id = yield self.store.get_current_public_room_stream_id() public_room_stream_id = since_token.public_room_stream_id newly_visible, newly_unpublished = yield self.store.get_public_room_changes( - public_room_stream_id, current_public_id + public_room_stream_id, current_public_id, + network_tuple=network_tuple, ) else: stream_token = yield self.store.get_room_max_stream_ordering() public_room_stream_id = yield self.store.get_current_public_room_stream_id() room_ids = yield self.store.get_public_room_ids_at_stream_id( - public_room_stream_id + public_room_stream_id, network_tuple=network_tuple, ) # We want to return rooms in a particular order: the number of joined @@ -311,7 +335,8 @@ class RoomListHandler(BaseHandler): @defer.inlineCallbacks def get_remote_public_room_list(self, server_name, limit=None, since_token=None, - search_filter=None): + search_filter=None, include_all_networks=False, + third_party_instance_id=None,): if search_filter: # We currently don't support searching across federation, so we have # to do it manually without pagination @@ -320,6 +345,8 @@ class RoomListHandler(BaseHandler): res = yield self._get_remote_list_cached( server_name, limit=limit, since_token=since_token, + include_all_networks=include_all_networks, + third_party_instance_id=third_party_instance_id, ) if search_filter: @@ -332,22 +359,30 @@ class RoomListHandler(BaseHandler): defer.returnValue(res) def _get_remote_list_cached(self, server_name, limit=None, since_token=None, - search_filter=None): + search_filter=None, include_all_networks=False, + third_party_instance_id=None,): repl_layer = self.hs.get_replication_layer() if search_filter: # We can't cache when asking for search return repl_layer.get_public_rooms( server_name, limit=limit, since_token=since_token, - search_filter=search_filter, + search_filter=search_filter, include_all_networks=include_all_networks, + third_party_instance_id=third_party_instance_id, ) - result = self.remote_response_cache.get((server_name, limit, since_token)) + key = ( + server_name, limit, since_token, include_all_networks, + third_party_instance_id, + ) + result = self.remote_response_cache.get(key) if not result: result = self.remote_response_cache.set( - (server_name, limit, since_token), + key, repl_layer.get_public_rooms( server_name, limit=limit, since_token=since_token, search_filter=search_filter, + include_all_networks=include_all_networks, + third_party_instance_id=third_party_instance_id, ) ) return result diff --git a/synapse/http/servlet.py b/synapse/http/servlet.py index 9346386238..8c22d6f00f 100644 --- a/synapse/http/servlet.py +++ b/synapse/http/servlet.py @@ -78,12 +78,16 @@ def parse_boolean(request, name, default=None, required=False): parameter is present and not one of "true" or "false". """ - if name in request.args: + return parse_boolean_from_args(request.args, name, default, required) + + +def parse_boolean_from_args(args, name, default=None, required=False): + if name in args: try: return { "true": True, "false": False, - }[request.args[name][0]] + }[args[name][0]] except: message = ( "Boolean query parameter %r must be one of" diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py index d79b421cba..4616e9b34a 100644 --- a/synapse/replication/resource.py +++ b/synapse/replication/resource.py @@ -475,7 +475,7 @@ class ReplicationResource(Resource): ) upto_token = _position_from_rows(public_rooms_rows, current_position) writer.write_header_and_rows("public_rooms", public_rooms_rows, ( - "position", "room_id", "visibility" + "position", "room_id", "visibility", "appservice_id", "network_id", ), position=upto_token) def federation(self, writer, current_token, limit, request_streams, federation_ack): diff --git a/synapse/rest/client/v1/directory.py b/synapse/rest/client/v1/directory.py index 09d0831594..8930f1826f 100644 --- a/synapse/rest/client/v1/directory.py +++ b/synapse/rest/client/v1/directory.py @@ -31,6 +31,7 @@ logger = logging.getLogger(__name__) def register_servlets(hs, http_server): ClientDirectoryServer(hs).register(http_server) ClientDirectoryListServer(hs).register(http_server) + ClientAppserviceDirectoryListServer(hs).register(http_server) class ClientDirectoryServer(ClientV1RestServlet): @@ -184,3 +185,36 @@ class ClientDirectoryListServer(ClientV1RestServlet): ) defer.returnValue((200, {})) + + +class ClientAppserviceDirectoryListServer(ClientV1RestServlet): + PATTERNS = client_path_patterns( + "/directory/list/appservice/(?P[^/]*)/(?P[^/]*)$" + ) + + def __init__(self, hs): + super(ClientAppserviceDirectoryListServer, self).__init__(hs) + self.store = hs.get_datastore() + self.handlers = hs.get_handlers() + + def on_PUT(self, request, network_id, room_id): + content = parse_json_object_from_request(request) + visibility = content.get("visibility", "public") + return self._edit(request, network_id, room_id, visibility) + + def on_DELETE(self, request, network_id, room_id): + return self._edit(request, network_id, room_id, "private") + + @defer.inlineCallbacks + def _edit(self, request, network_id, room_id, visibility): + requester = yield self.auth.get_user_by_req(request) + if not requester.app_service: + raise AuthError( + 403, "Only appservices can edit the appservice published room list" + ) + + yield self.handlers.directory_handler.edit_published_appservice_room_list( + requester.app_service.id, network_id, room_id, visibility, + ) + + defer.returnValue((200, {})) diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 3fb1f2deb3..b13095405b 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -21,7 +21,7 @@ from synapse.api.errors import SynapseError, Codes, AuthError from synapse.streams.config import PaginationConfig from synapse.api.constants import EventTypes, Membership from synapse.api.filtering import Filter -from synapse.types import UserID, RoomID, RoomAlias +from synapse.types import UserID, RoomID, RoomAlias, ThirdPartyInstanceID from synapse.events.utils import serialize_event, format_event_for_client_v2 from synapse.http.servlet import ( parse_json_object_from_request, parse_string, parse_integer @@ -321,6 +321,20 @@ class PublicRoomListRestServlet(ClientV1RestServlet): since_token = content.get("since", None) search_filter = content.get("filter", None) + include_all_networks = content.get("include_all_networks", False) + third_party_instance_id = content.get("third_party_instance_id", None) + + if include_all_networks: + network_tuple = None + if third_party_instance_id is not None: + raise SynapseError( + 400, "Can't use include_all_networks with an explicit network" + ) + elif third_party_instance_id is None: + network_tuple = ThirdPartyInstanceID(None, None) + else: + network_tuple = ThirdPartyInstanceID.from_string(third_party_instance_id) + handler = self.hs.get_room_list_handler() if server: data = yield handler.get_remote_public_room_list( @@ -328,12 +342,15 @@ class PublicRoomListRestServlet(ClientV1RestServlet): limit=limit, since_token=since_token, search_filter=search_filter, + include_all_networks=include_all_networks, + third_party_instance_id=third_party_instance_id, ) else: data = yield handler.get_local_public_room_list( limit=limit, since_token=since_token, search_filter=search_filter, + network_tuple=network_tuple, ) defer.returnValue((200, data)) diff --git a/synapse/storage/room.py b/synapse/storage/room.py index 11813b44f6..4b3605c776 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -106,7 +106,11 @@ class RoomStore(SQLBaseStore): entries = self._simple_select_list_txn( txn, table="public_room_list_stream", - keyvalues={"room_id": room_id}, + keyvalues={ + "room_id": room_id, + "appservice_id": None, + "network_id": None, + }, retcols=("stream_id", "visibility"), ) @@ -124,6 +128,8 @@ class RoomStore(SQLBaseStore): "stream_id": next_id, "room_id": room_id, "visibility": is_public, + "appservice_id": None, + "network_id": None, } ) @@ -133,6 +139,73 @@ class RoomStore(SQLBaseStore): set_room_is_public_txn, next_id, ) + @defer.inlineCallbacks + def set_room_is_public_appservice(self, room_id, appservice_id, network_id, + is_public): + """Edit the appservice/network specific public room list. + """ + def set_room_is_public_appservice_txn(txn, next_id): + if is_public: + try: + self._simple_insert_txn( + txn, + table="appservice_room_list", + values={ + "appservice_id": appservice_id, + "network_id": "network_id", + "room_id": room_id + }, + ) + except self.database_engine.module.IntegrityError: + # We've already inserted, nothing to do. + return + else: + self._simple_delete_txn( + txn, + table="appservice_room_list", + keyvalues={ + "appservice_id": appservice_id, + "network_id": network_id, + "room_id": room_id + }, + ) + + entries = self._simple_select_list_txn( + txn, + table="public_room_list_stream", + keyvalues={ + "room_id": room_id, + "appservice_id": appservice_id, + "network_id": network_id, + }, + retcols=("stream_id", "visibility"), + ) + + entries.sort(key=lambda r: r["stream_id"]) + + add_to_stream = True + if entries: + add_to_stream = bool(entries[-1]["visibility"]) != is_public + + if add_to_stream: + self._simple_insert_txn( + txn, + table="public_room_list_stream", + values={ + "stream_id": next_id, + "room_id": room_id, + "visibility": is_public, + "appservice_id": appservice_id, + "network_id": network_id, + } + ) + + with self._public_room_id_gen.get_next() as next_id: + yield self.runInteraction( + "set_room_is_public_appservice", + set_room_is_public_appservice_txn, next_id, + ) + def get_public_room_ids(self): return self._simple_select_onecol( table="rooms", @@ -259,38 +332,95 @@ class RoomStore(SQLBaseStore): def get_current_public_room_stream_id(self): return self._public_room_id_gen.get_current_token() - def get_public_room_ids_at_stream_id(self, stream_id): + def get_public_room_ids_at_stream_id(self, stream_id, network_tuple): + """Get pulbic rooms for a particular list, or across all lists. + + Args: + stream_id (int) + network_tuple (ThirdPartyInstanceID): The list to use (None, None) + means the main list, None means all lsits. + """ return self.runInteraction( "get_public_room_ids_at_stream_id", - self.get_public_room_ids_at_stream_id_txn, stream_id + self.get_public_room_ids_at_stream_id_txn, + stream_id, network_tuple=network_tuple ) - def get_public_room_ids_at_stream_id_txn(self, txn, stream_id): + def get_public_room_ids_at_stream_id_txn(self, txn, stream_id, + network_tuple): return { rm - for rm, vis in self.get_published_at_stream_id_txn(txn, stream_id).items() + for rm, vis in self.get_published_at_stream_id_txn( + txn, stream_id, network_tuple=network_tuple + ).items() if vis } - def get_published_at_stream_id_txn(self, txn, stream_id): - sql = (""" - SELECT room_id, visibility FROM public_room_list_stream - INNER JOIN ( - SELECT room_id, max(stream_id) AS stream_id + def get_published_at_stream_id_txn(self, txn, stream_id, network_tuple): + if network_tuple: + # We want to get from a particular list. No aggregation required. + + sql = (""" + SELECT room_id, visibility FROM public_room_list_stream + INNER JOIN ( + SELECT room_id, max(stream_id) AS stream_id + FROM public_room_list_stream + WHERE stream_id <= ? %s + GROUP BY room_id + ) grouped USING (room_id, stream_id) + """) + + if network_tuple.appservice_id is not None: + txn.execute( + sql % ("AND appservice_id = ? AND network_id = ?",), + (stream_id, network_tuple.appservice_id, network_tuple.network_id,) + ) + else: + txn.execute( + sql % ("AND appservice_id IS NULL",), + (stream_id,) + ) + return dict(txn.fetchall()) + else: + # We want to get from all lists, so we need to aggregate the results + + logger.info("Executing full list") + + sql = (""" + SELECT room_id, visibility FROM public_room_list_stream - WHERE stream_id <= ? - GROUP BY room_id - ) grouped USING (room_id, stream_id) - """) + INNER JOIN ( + SELECT + room_id, max(stream_id) AS stream_id, appservice_id, + network_id + FROM public_room_list_stream + WHERE stream_id <= ? + GROUP BY room_id, appservice_id, network_id + ) grouped USING (room_id, stream_id) + """) - txn.execute(sql, (stream_id,)) - return dict(txn.fetchall()) + txn.execute( + sql, + (stream_id,) + ) + + results = {} + # A room is visible if its visible on any list. + for room_id, visibility in txn.fetchall(): + results[room_id] = bool(visibility) or results.get(room_id, False) + + return results - def get_public_room_changes(self, prev_stream_id, new_stream_id): + def get_public_room_changes(self, prev_stream_id, new_stream_id, + network_tuple): def get_public_room_changes_txn(txn): - then_rooms = self.get_public_room_ids_at_stream_id_txn(txn, prev_stream_id) + then_rooms = self.get_public_room_ids_at_stream_id_txn( + txn, prev_stream_id, network_tuple + ) - now_rooms_dict = self.get_published_at_stream_id_txn(txn, new_stream_id) + now_rooms_dict = self.get_published_at_stream_id_txn( + txn, new_stream_id, network_tuple + ) now_rooms_visible = set( rm for rm, vis in now_rooms_dict.items() if vis @@ -311,7 +441,8 @@ class RoomStore(SQLBaseStore): def get_all_new_public_rooms(self, prev_id, current_id, limit): def get_all_new_public_rooms(txn): sql = (""" - SELECT stream_id, room_id, visibility FROM public_room_list_stream + SELECT stream_id, room_id, visibility, appservice_id, network_id + FROM public_room_list_stream WHERE stream_id > ? AND stream_id <= ? ORDER BY stream_id ASC LIMIT ? diff --git a/synapse/storage/schema/delta/39/appservice_room_list.sql b/synapse/storage/schema/delta/39/appservice_room_list.sql new file mode 100644 index 0000000000..7e8344a577 --- /dev/null +++ b/synapse/storage/schema/delta/39/appservice_room_list.sql @@ -0,0 +1,27 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE TABLE appservice_room_list( + appservice_id TEXT NOT NULL, + network_id TEXT NOT NULL, + room_id TEXT NOT NULL +); + +CREATE UNIQUE INDEX appservice_room_list_idx ON appservice_room_list( + appservice_id, network_id, room_id +); + +ALTER TABLE public_room_list_stream ADD COLUMN appservice_id TEXT; +ALTER TABLE public_room_list_stream ADD COLUMN network_id TEXT; diff --git a/synapse/types.py b/synapse/types.py index ffab12df09..3a3ab21d17 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -274,3 +274,37 @@ class RoomStreamToken(namedtuple("_StreamToken", "topological stream")): return "t%d-%d" % (self.topological, self.stream) else: return "s%d" % (self.stream,) + + +class ThirdPartyInstanceID( + namedtuple("ThirdPartyInstanceID", ("appservice_id", "network_id")) +): + # Deny iteration because it will bite you if you try to create a singleton + # set by: + # users = set(user) + def __iter__(self): + raise ValueError("Attempted to iterate a %s" % (type(self).__name__,)) + + # Because this class is a namedtuple of strings, it is deeply immutable. + def __copy__(self): + return self + + def __deepcopy__(self, memo): + return self + + @classmethod + def from_string(cls, s): + bits = s.split("|", 2) + if len(bits) != 2: + raise SynapseError(400, "Invalid ID %r" % (s,)) + + return cls(appservice_id=bits[0], network_id=bits[1]) + + def to_string(self): + return "%s|%s" % (self.appservice_id, self.network_id,) + + __str__ = to_string + + @classmethod + def create(cls, appservice_id, network_id,): + return cls(appservice_id=appservice_id, network_id=network_id) -- cgit 1.4.1 From 09cbcb78d3f474ada72d604a24a44c91f2fdb6fe Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 12 Dec 2016 14:41:51 +0000 Subject: Add cache to get_public_room_ids_at_stream_id --- synapse/replication/slave/storage/room.py | 3 ++- synapse/storage/room.py | 2 ++ 2 files changed, 4 insertions(+), 1 deletion(-) (limited to 'synapse/replication') diff --git a/synapse/replication/slave/storage/room.py b/synapse/replication/slave/storage/room.py index 23c613863f..6df9a25ef3 100644 --- a/synapse/replication/slave/storage/room.py +++ b/synapse/replication/slave/storage/room.py @@ -15,6 +15,7 @@ from ._base import BaseSlavedStore from synapse.storage import DataStore +from synapse.storage.room import RoomStore from ._slaved_id_tracker import SlavedIdTracker @@ -30,7 +31,7 @@ class RoomStore(BaseSlavedStore): DataStore.get_current_public_room_stream_id.__func__ ) get_public_room_ids_at_stream_id = ( - DataStore.get_public_room_ids_at_stream_id.__func__ + RoomStore.__dict__["get_public_room_ids_at_stream_id"] ) get_public_room_ids_at_stream_id_txn = ( DataStore.get_public_room_ids_at_stream_id_txn.__func__ diff --git a/synapse/storage/room.py b/synapse/storage/room.py index 36aa8c5f83..8a2fe2fdf5 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -16,6 +16,7 @@ from twisted.internet import defer from synapse.api.errors import StoreError +from synapse.util.caches.descriptors import cached from ._base import SQLBaseStore from .engines import PostgresEngine, Sqlite3Engine @@ -346,6 +347,7 @@ class RoomStore(SQLBaseStore): def get_current_public_room_stream_id(self): return self._public_room_id_gen.get_current_token() + @cached(num_args=2, max_entries=100) def get_public_room_ids_at_stream_id(self, stream_id, network_tuple): """Get pulbic rooms for a particular list, or across all lists. -- cgit 1.4.1