diff options
author | Erik Johnston <erikj@jki.re> | 2016-09-15 13:27:09 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2016-09-15 13:27:09 +0100 |
commit | e457034e99d46d32e3d71eddb12cda07d8a0370e (patch) | |
tree | 2b3cc4cbf5a17a6de91c6f07e47b038b22186921 | |
parent | Merge pull request #1117 from matrix-org/erikj/fix_state (diff) | |
parent | By default limit /publicRooms to 100 entries (diff) | |
download | synapse-e457034e99d46d32e3d71eddb12cda07d8a0370e.tar.xz |
Merge pull request #1121 from matrix-org/erikj/public_room_paginate
Add pagination support to publicRooms
-rw-r--r-- | synapse/config/server.py | 9 | ||||
-rw-r--r-- | synapse/federation/federation_client.py | 22 | ||||
-rw-r--r-- | synapse/federation/transport/client.py | 9 | ||||
-rw-r--r-- | synapse/federation/transport/server.py | 10 | ||||
-rw-r--r-- | synapse/handlers/room_list.py | 280 | ||||
-rw-r--r-- | synapse/http/servlet.py | 18 | ||||
-rw-r--r-- | synapse/python_dependencies.py | 1 | ||||
-rw-r--r-- | synapse/replication/resource.py | 20 | ||||
-rw-r--r-- | synapse/replication/slave/storage/events.py | 8 | ||||
-rw-r--r-- | synapse/replication/slave/storage/room.py | 31 | ||||
-rw-r--r-- | synapse/rest/client/v1/room.py | 18 | ||||
-rw-r--r-- | synapse/state.py | 5 | ||||
-rw-r--r-- | synapse/storage/__init__.py | 3 | ||||
-rw-r--r-- | synapse/storage/event_federation.py | 81 | ||||
-rw-r--r-- | synapse/storage/room.py | 146 | ||||
-rw-r--r-- | synapse/storage/schema/delta/35/public_room_list_change_stream.sql | 33 | ||||
-rw-r--r-- | synapse/storage/schema/delta/35/stream_order_to_extrem.sql | 37 | ||||
-rw-r--r-- | synapse/storage/stream.py | 3 |
18 files changed, 595 insertions, 139 deletions
diff --git a/synapse/config/server.py b/synapse/config/server.py index 51eaf423ce..ed5417d0c3 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -29,7 +29,6 @@ class ServerConfig(Config): self.user_agent_suffix = config.get("user_agent_suffix") self.use_frozen_dicts = config.get("use_frozen_dicts", False) self.public_baseurl = config.get("public_baseurl") - self.secondary_directory_servers = config.get("secondary_directory_servers", []) if self.public_baseurl is not None: if self.public_baseurl[-1] != '/': @@ -142,14 +141,6 @@ class ServerConfig(Config): # The GC threshold parameters to pass to `gc.set_threshold`, if defined # gc_thresholds: [700, 10, 10] - # A list of other Home Servers to fetch the public room directory from - # and include in the public room directory of this home server - # This is a temporary stopgap solution to populate new server with a - # list of rooms until there exists a good solution of a decentralized - # room directory. - # secondary_directory_servers: - # - matrix.org - # List of ports that Synapse should listen on, their purpose and their # configuration. listeners: diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 91bed4746f..f0a684fc13 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -24,7 +24,6 @@ from synapse.api.errors import ( CodeMessageException, HttpResponseException, SynapseError, ) from synapse.util import unwrapFirstError -from synapse.util.async import concurrently_execute from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.logutils import log_function from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred @@ -719,24 +718,11 @@ class FederationClient(FederationBase): raise RuntimeError("Failed to send to any server.") - @defer.inlineCallbacks - def get_public_rooms(self, destinations): - results_by_server = {} - - @defer.inlineCallbacks - def _get_result(s): - if s == self.server_name: - defer.returnValue() - - try: - result = yield self.transport_layer.get_public_rooms(s) - results_by_server[s] = result - except: - logger.exception("Error getting room list from server %r", s) - - yield concurrently_execute(_get_result, destinations, 3) + def get_public_rooms(self, destination, limit=None, since_token=None): + if destination == self.server_name: + return - defer.returnValue(results_by_server) + return self.transport_layer.get_public_rooms(destination, limit, since_token) @defer.inlineCallbacks def query_auth(self, destination, room_id, event_id, local_auth): diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index 2b138526ba..f508b70f11 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -248,12 +248,19 @@ class TransportLayerClient(object): @defer.inlineCallbacks @log_function - def get_public_rooms(self, remote_server): + def get_public_rooms(self, remote_server, limit, since_token): path = PREFIX + "/publicRooms" + args = {} + if limit: + args["limit"] = [str(limit)] + if since_token: + args["since"] = [since_token] + response = yield self.client.get_json( destination=remote_server, path=path, + args=args, ) defer.returnValue(response) diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 37c0d4fbc4..fec337be64 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -18,7 +18,9 @@ from twisted.internet import defer from synapse.api.urls import FEDERATION_PREFIX as PREFIX from synapse.api.errors import Codes, SynapseError from synapse.http.server import JsonResource -from synapse.http.servlet import parse_json_object_from_request +from synapse.http.servlet import ( + parse_json_object_from_request, parse_integer_from_args, parse_string_from_args, +) from synapse.util.ratelimitutils import FederationRateLimiter from synapse.util.versionstring import get_version_string @@ -554,7 +556,11 @@ class PublicRoomList(BaseFederationServlet): @defer.inlineCallbacks def on_GET(self, origin, content, query): - data = yield self.room_list_handler.get_local_public_room_list() + limit = parse_integer_from_args(query, "limit", 0) + since_token = parse_string_from_args(query, "since", None) + data = yield self.room_list_handler.get_local_public_room_list( + limit, since_token + ) defer.returnValue((200, data)) diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py index d72e8c99f9..28bc35f8a3 100644 --- a/synapse/handlers/room_list.py +++ b/synapse/handlers/room_list.py @@ -18,13 +18,16 @@ from twisted.internet import defer from ._base import BaseHandler from synapse.api.constants import ( - EventTypes, JoinRules, Membership, + EventTypes, JoinRules, ) -from synapse.api.errors import SynapseError from synapse.util.async import concurrently_execute from synapse.util.caches.response_cache import ResponseCache +from collections import namedtuple +from unpaddedbase64 import encode_base64, decode_base64 + import logging +import msgpack logger = logging.getLogger(__name__) @@ -35,28 +38,130 @@ class RoomListHandler(BaseHandler): def __init__(self, hs): super(RoomListHandler, self).__init__(hs) self.response_cache = ResponseCache(hs) - self.remote_list_request_cache = ResponseCache(hs) - self.remote_list_cache = {} - self.fetch_looping_call = hs.get_clock().looping_call( - self.fetch_all_remote_lists, REMOTE_ROOM_LIST_POLL_INTERVAL - ) - self.fetch_all_remote_lists() - def get_local_public_room_list(self): - result = self.response_cache.get(()) + def get_local_public_room_list(self, limit=None, since_token=None): + result = self.response_cache.get((limit, since_token)) if not result: - result = self.response_cache.set((), self._get_public_room_list()) + result = self.response_cache.set( + (limit, since_token), + self._get_public_room_list(limit, since_token) + ) return result @defer.inlineCallbacks - def _get_public_room_list(self): - room_ids = yield self.store.get_public_room_ids() + def _get_public_room_list(self, limit=None, since_token=None): + if since_token and since_token != "END": + since_token = RoomListNextBatch.from_token(since_token) + else: + since_token = None + + rooms_to_order_value = {} + rooms_to_num_joined = {} + rooms_to_latest_event_ids = {} + + newly_visible = [] + newly_unpublished = [] + if since_token: + stream_token = since_token.stream_ordering + current_public_id = yield self.store.get_current_public_room_stream_id() + public_room_stream_id = since_token.public_room_stream_id + newly_visible, newly_unpublished = yield self.store.get_public_room_changes( + public_room_stream_id, current_public_id + ) + else: + stream_token = yield self.store.get_room_max_stream_ordering() + public_room_stream_id = yield self.store.get_current_public_room_stream_id() + + room_ids = yield self.store.get_public_room_ids_at_stream_id( + public_room_stream_id + ) + + # We want to return rooms in a particular order: the number of joined + # users. We then arbitrarily use the room_id as a tie breaker. + + @defer.inlineCallbacks + def get_order_for_room(room_id): + latest_event_ids = rooms_to_latest_event_ids.get(room_id, None) + if not latest_event_ids: + latest_event_ids = yield self.store.get_forward_extremeties_for_room( + room_id, stream_token + ) + rooms_to_latest_event_ids[room_id] = latest_event_ids + + if not latest_event_ids: + return + + joined_users = yield self.state_handler.get_current_user_in_room( + room_id, latest_event_ids, + ) + num_joined_users = len(joined_users) + rooms_to_num_joined[room_id] = num_joined_users + + if num_joined_users == 0: + return + + # We want larger rooms to be first, hence negating num_joined_users + rooms_to_order_value[room_id] = (-num_joined_users, room_id) + + yield concurrently_execute(get_order_for_room, room_ids, 10) + + sorted_entries = sorted(rooms_to_order_value.items(), key=lambda e: e[1]) + sorted_rooms = [room_id for room_id, _ in sorted_entries] + + if since_token: + if since_token.direction_is_forward: + sorted_rooms = sorted_rooms[since_token.current_limit:] + else: + sorted_rooms = sorted_rooms[:since_token.current_limit] + sorted_rooms.reverse() - results = [] + new_limit = None + if limit: + if sorted_rooms[limit:]: + new_limit = limit + if since_token: + if since_token.direction_is_forward: + new_limit += since_token.current_limit + else: + new_limit = since_token.current_limit - new_limit + new_limit = max(0, new_limit) + sorted_rooms = sorted_rooms[:limit] + + chunk = [] @defer.inlineCallbacks def handle_room(room_id): - current_state = yield self.state_handler.get_current_state(room_id) + num_joined_users = rooms_to_num_joined[room_id] + if num_joined_users == 0: + return + + if room_id in newly_unpublished: + return + + result = { + "room_id": room_id, + "num_joined_members": num_joined_users, + } + + current_state_ids = yield self.state_handler.get_current_state_ids(room_id) + + event_map = yield self.store.get_events([ + event_id for key, event_id in current_state_ids.items() + if key[0] in ( + EventTypes.JoinRules, + EventTypes.Name, + EventTypes.Topic, + EventTypes.CanonicalAlias, + EventTypes.RoomHistoryVisibility, + EventTypes.GuestAccess, + "m.room.avatar", + ) + ]) + + current_state = { + (ev.type, ev.state_key): ev + for ev in event_map.values() + } # Double check that this is actually a public room. join_rules_event = current_state.get((EventTypes.JoinRules, "")) @@ -65,18 +170,6 @@ class RoomListHandler(BaseHandler): if join_rule and join_rule != JoinRules.PUBLIC: defer.returnValue(None) - result = {"room_id": room_id} - - num_joined_users = len([ - 1 for _, event in current_state.items() - if event.type == EventTypes.Member - and event.membership == Membership.JOIN - ]) - if num_joined_users == 0: - return - - result["num_joined_members"] = num_joined_users - aliases = yield self.store.get_aliases_for_room(room_id) if aliases: result["aliases"] = aliases @@ -117,68 +210,87 @@ class RoomListHandler(BaseHandler): if avatar_url: result["avatar_url"] = avatar_url - results.append(result) + chunk.append(result) - yield concurrently_execute(handle_room, room_ids, 10) + yield concurrently_execute(handle_room, sorted_rooms, 10) - # FIXME (erikj): START is no longer a valid value - defer.returnValue({"start": "START", "end": "END", "chunk": results}) + chunk.sort(key=lambda e: (-e["num_joined_members"], e["room_id"])) - @defer.inlineCallbacks - def fetch_all_remote_lists(self): - deferred = self.hs.get_replication_layer().get_public_rooms( - self.hs.config.secondary_directory_servers - ) - self.remote_list_request_cache.set((), deferred) - self.remote_list_cache = yield deferred + results = { + "chunk": chunk, + } + + if since_token: + results["new_rooms"] = bool(newly_visible) + + if not since_token or since_token.direction_is_forward: + if new_limit: + results["next_batch"] = RoomListNextBatch( + stream_ordering=stream_token, + public_room_stream_id=public_room_stream_id, + current_limit=new_limit, + direction_is_forward=True, + ).to_token() + + if since_token: + results["prev_batch"] = since_token.copy_and_replace( + direction_is_forward=False, + ).to_token() + else: + if new_limit: + results["prev_batch"] = RoomListNextBatch( + stream_ordering=stream_token, + public_room_stream_id=public_room_stream_id, + current_limit=new_limit, + direction_is_forward=False, + ).to_token() + + if since_token: + results["next_batch"] = since_token.copy_and_replace( + direction_is_forward=True, + ).to_token() + + defer.returnValue(results) @defer.inlineCallbacks - def get_remote_public_room_list(self, server_name): + def get_remote_public_room_list(self, server_name, limit=None, since_token=None): res = yield self.hs.get_replication_layer().get_public_rooms( - [server_name] + server_name, limit=limit, since_token=since_token, ) - if server_name not in res: - raise SynapseError(404, "Server not found") - defer.returnValue(res[server_name]) - - @defer.inlineCallbacks - def get_aggregated_public_room_list(self): - """ - Get the public room list from this server and the servers - specified in the secondary_directory_servers config option. - XXX: Pagination... - """ - # We return the results from out cache which is updated by a looping call, - # unless we're missing a cache entry, in which case wait for the result - # of the fetch if there's one in progress. If not, omit that server. - wait = False - for s in self.hs.config.secondary_directory_servers: - if s not in self.remote_list_cache: - logger.warn("No cached room list from %s: waiting for fetch", s) - wait = True - break - - if wait and self.remote_list_request_cache.get(()): - yield self.remote_list_request_cache.get(()) - - public_rooms = yield self.get_local_public_room_list() - - # keep track of which room IDs we've seen so we can de-dup - room_ids = set() - - # tag all the ones in our list with our server name. - # Also add the them to the de-deping set - for room in public_rooms['chunk']: - room["server_name"] = self.hs.hostname - room_ids.add(room["room_id"]) - - # Now add the results from federation - for server_name, server_result in self.remote_list_cache.items(): - for room in server_result["chunk"]: - if room["room_id"] not in room_ids: - room["server_name"] = server_name - public_rooms["chunk"].append(room) - room_ids.add(room["room_id"]) - - defer.returnValue(public_rooms) + defer.returnValue(res) + + +class RoomListNextBatch(namedtuple("RoomListNextBatch", ( + "stream_ordering", # stream_ordering of the first public room list + "public_room_stream_id", # public room stream id for first public room list + "current_limit", # The number of previous rooms returned + "direction_is_forward", # Bool if this is a next_batch, false if prev_batch +))): + + KEY_DICT = { + "stream_ordering": "s", + "public_room_stream_id": "p", + "current_limit": "n", + "direction_is_forward": "d", + } + + REVERSE_KEY_DICT = {v: k for k, v in KEY_DICT.items()} + + @classmethod + def from_token(cls, token): + return RoomListNextBatch(**{ + cls.REVERSE_KEY_DICT[key]: val + for key, val in msgpack.loads(decode_base64(token)).items() + }) + + def to_token(self): + return encode_base64(msgpack.dumps({ + self.KEY_DICT[key]: val + for key, val in self._asdict().items() + })) + + def copy_and_replace(self, **kwds): + return self._replace( + **kwds + ) diff --git a/synapse/http/servlet.py b/synapse/http/servlet.py index e41afeab8e..9346386238 100644 --- a/synapse/http/servlet.py +++ b/synapse/http/servlet.py @@ -41,9 +41,13 @@ def parse_integer(request, name, default=None, required=False): SynapseError: if the parameter is absent and required, or if the parameter is present and not an integer. """ - if name in request.args: + return parse_integer_from_args(request.args, name, default, required) + + +def parse_integer_from_args(args, name, default=None, required=False): + if name in args: try: - return int(request.args[name][0]) + return int(args[name][0]) except: message = "Query parameter %r must be an integer" % (name,) raise SynapseError(400, message) @@ -116,9 +120,15 @@ def parse_string(request, name, default=None, required=False, parameter is present, must be one of a list of allowed values and is not one of those allowed values. """ + return parse_string_from_args( + request.args, name, default, required, allowed_values, param_type, + ) - if name in request.args: - value = request.args[name][0] + +def parse_string_from_args(args, name, default=None, required=False, + allowed_values=None, param_type="string"): + if name in args: + value = args[name][0] if allowed_values is not None and value not in allowed_values: message = "Query parameter %r must be one of [%s]" % ( name, ", ".join(repr(v) for v in allowed_values) diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py index 86e3d89154..b9e41770ee 100644 --- a/synapse/python_dependencies.py +++ b/synapse/python_dependencies.py @@ -36,6 +36,7 @@ REQUIREMENTS = { "blist": ["blist"], "pysaml2>=3.0.0,<4.0.0": ["saml2>=3.0.0,<4.0.0"], "pymacaroons-pynacl": ["pymacaroons"], + "msgpack-python>=0.3.0": ["msgpack"], } CONDITIONAL_REQUIREMENTS = { "web_client": { diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py index 299e9419a4..9aab3ce23c 100644 --- a/synapse/replication/resource.py +++ b/synapse/replication/resource.py @@ -42,6 +42,7 @@ STREAM_NAMES = ( ("pushers",), ("caches",), ("to_device",), + ("public_rooms",), ) @@ -131,6 +132,7 @@ class ReplicationResource(Resource): push_rules_token, room_stream_token = self.store.get_push_rules_stream_token() pushers_token = self.store.get_pushers_stream_token() caches_token = self.store.get_cache_stream_token() + public_rooms_token = self.store.get_current_public_room_stream_id() defer.returnValue(_ReplicationToken( room_stream_token, @@ -144,6 +146,7 @@ class ReplicationResource(Resource): 0, # State stream is no longer a thing caches_token, int(stream_token.to_device_key), + int(public_rooms_token), )) @request_handler() @@ -193,6 +196,7 @@ class ReplicationResource(Resource): yield self.pushers(writer, current_token, limit, request_streams) yield self.caches(writer, current_token, limit, request_streams) yield self.to_device(writer, current_token, limit, request_streams) + yield self.public_rooms(writer, current_token, limit, request_streams) self.streams(writer, current_token, request_streams) logger.debug("Replicated %d rows", writer.total) @@ -400,6 +404,20 @@ class ReplicationResource(Resource): "position", "user_id", "device_id", "message_json" )) + @defer.inlineCallbacks + def public_rooms(self, writer, current_token, limit, request_streams): + current_position = current_token.public_rooms + + public_rooms = request_streams.get("public_rooms") + + if public_rooms is not None: + public_rooms_rows = yield self.store.get_all_new_public_rooms( + public_rooms, current_position, limit + ) + writer.write_header_and_rows("public_rooms", public_rooms_rows, ( + "position", "room_id", "visibility" + )) + class _Writer(object): """Writes the streams as a JSON object as the response to the request""" @@ -428,7 +446,7 @@ class _Writer(object): class _ReplicationToken(collections.namedtuple("_ReplicationToken", ( "events", "presence", "typing", "receipts", "account_data", "backfill", - "push_rules", "pushers", "state", "caches", "to_device", + "push_rules", "pushers", "state", "caches", "to_device", "public_rooms", ))): __slots__ = [] diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index 15c52774a2..f8965c73a0 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -61,6 +61,8 @@ class SlavedEventStore(BaseSlavedStore): "MembershipStreamChangeCache", events_max, ) + self.stream_ordering_month_ago = 0 + # Cached functions can't be accessed through a class instance so we need # to reach inside the __dict__ to extract them. get_rooms_for_user = RoomMemberStore.__dict__["get_rooms_for_user"] @@ -168,6 +170,12 @@ class SlavedEventStore(BaseSlavedStore): get_auth_chain_ids = DataStore.get_auth_chain_ids.__func__ _get_auth_chain_ids_txn = DataStore._get_auth_chain_ids_txn.__func__ + get_room_max_stream_ordering = DataStore.get_room_max_stream_ordering.__func__ + + get_forward_extremeties_for_room = ( + DataStore.get_forward_extremeties_for_room.__func__ + ) + def stream_positions(self): result = super(SlavedEventStore, self).stream_positions() result["events"] = self._stream_id_gen.get_current_token() diff --git a/synapse/replication/slave/storage/room.py b/synapse/replication/slave/storage/room.py index d5bb0f98ea..81743941dc 100644 --- a/synapse/replication/slave/storage/room.py +++ b/synapse/replication/slave/storage/room.py @@ -15,7 +15,38 @@ from ._base import BaseSlavedStore from synapse.storage import DataStore +from ._slaved_id_tracker import SlavedIdTracker class RoomStore(BaseSlavedStore): + def __init__(self, db_conn, hs): + super(RoomStore, self).__init__(db_conn, hs) + self._public_room_id_gen = SlavedIdTracker( + db_conn, "public_room_list_stream", "stream_id" + ) + get_public_room_ids = DataStore.get_public_room_ids.__func__ + get_current_public_room_stream_id = ( + DataStore.get_current_public_room_stream_id.__func__ + ) + get_public_room_ids_at_stream_id = ( + DataStore.get_public_room_ids_at_stream_id.__func__ + ) + get_public_room_ids_at_stream_id_txn = ( + DataStore.get_public_room_ids_at_stream_id_txn.__func__ + ) + get_published_at_stream_id_txn = ( + DataStore.get_published_at_stream_id_txn.__func__ + ) + + def stream_positions(self): + result = super(RoomStore, self).stream_positions() + result["public_rooms"] = self._public_room_id_gen.get_current_token() + return result + + def process_replication(self, result): + stream = result.get("public_rooms") + if stream: + self._public_room_id_gen.advance(int(stream["position"])) + + return super(RoomStore, self).process_replication(result) diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 22d6a7d31e..924c785358 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -23,7 +23,9 @@ from synapse.api.constants import EventTypes, Membership from synapse.api.filtering import Filter from synapse.types import UserID, RoomID, RoomAlias from synapse.events.utils import serialize_event, format_event_for_client_v2 -from synapse.http.servlet import parse_json_object_from_request, parse_string +from synapse.http.servlet import ( + parse_json_object_from_request, parse_string, parse_integer +) import logging import urllib @@ -317,11 +319,21 @@ class PublicRoomListRestServlet(ClientV1RestServlet): else: pass + limit = parse_integer(request, "limit", 100) + since_token = parse_string(request, "since", None) + handler = self.hs.get_room_list_handler() if server: - data = yield handler.get_remote_public_room_list(server) + data = yield handler.get_remote_public_room_list( + server, + limit=limit, + since_token=since_token, + ) else: - data = yield handler.get_aggregated_public_room_list() + data = yield handler.get_local_public_room_list( + limit=limit, + since_token=since_token, + ) defer.returnValue((200, data)) diff --git a/synapse/state.py b/synapse/state.py index d89aca26b1..b4eca0e5d5 100644 --- a/synapse/state.py +++ b/synapse/state.py @@ -157,8 +157,9 @@ class StateHandler(object): defer.returnValue(state) @defer.inlineCallbacks - def get_current_user_in_room(self, room_id): - latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id) + def get_current_user_in_room(self, room_id, latest_event_ids=None): + if not latest_event_ids: + latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id) entry = yield self.resolve_state_groups(room_id, latest_event_ids) joined_users = yield self.store.get_joined_users_from_state( room_id, entry.state_id, entry.state diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index a61e83d5de..0099a3f5bb 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -113,6 +113,9 @@ class DataStore(RoomMemberStore, RoomStore, self._device_inbox_id_gen = StreamIdGenerator( db_conn, "device_max_stream_id", "stream_id" ) + self._public_room_id_gen = StreamIdGenerator( + db_conn, "public_room_list_stream", "stream_id" + ) self._transaction_id_gen = IdGenerator(db_conn, "sent_transactions", "id") self._state_groups_id_gen = IdGenerator(db_conn, "state_groups", "id") diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 0827946207..ec6dbe5492 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -16,6 +16,7 @@ from twisted.internet import defer from ._base import SQLBaseStore +from synapse.api.errors import StoreError from synapse.util.caches.descriptors import cached from unpaddedbase64 import encode_base64 @@ -36,6 +37,13 @@ class EventFederationStore(SQLBaseStore): and backfilling from another server respectively. """ + def __init__(self, hs): + super(EventFederationStore, self).__init__(hs) + + hs.get_clock().looping_call( + self._delete_old_forward_extrem_cache, 60 * 60 * 1000 + ) + def get_auth_chain(self, event_ids): return self.get_auth_chain_ids(event_ids).addCallback(self._get_events) @@ -270,6 +278,37 @@ class EventFederationStore(SQLBaseStore): ] ) + # We now insert into stream_ordering_to_exterm a mapping from room_id, + # new stream_ordering to new forward extremeties in the room. + # This allows us to later efficiently look up the forward extremeties + # for a room before a given stream_ordering + max_stream_ord = max( + ev.internal_metadata.stream_ordering for ev in events + ) + new_extrem = {} + for room_id in events_by_room: + event_ids = self._simple_select_onecol_txn( + txn, + table="event_forward_extremities", + keyvalues={"room_id": room_id}, + retcol="event_id", + ) + new_extrem[room_id] = event_ids + + self._simple_insert_many_txn( + txn, + table="stream_ordering_to_exterm", + values=[ + { + "room_id": room_id, + "event_id": event_id, + "stream_ordering": max_stream_ord, + } + for room_id, extrem_evs in new_extrem.items() + for event_id in extrem_evs + ] + ) + query = ( "INSERT INTO event_backward_extremities (event_id, room_id)" " SELECT ?, ? WHERE NOT EXISTS (" @@ -305,6 +344,48 @@ class EventFederationStore(SQLBaseStore): self.get_latest_event_ids_in_room.invalidate, (room_id,) ) + def get_forward_extremeties_for_room(self, room_id, stream_ordering): + """For a given room_id and stream_ordering, return the forward + extremeties of the room at that point in "time". + + Throws a StoreError if we have since purged the index for + stream_orderings from that point. + """ + + if stream_ordering <= self.stream_ordering_month_ago: + raise StoreError(400, "stream_ordering too old") + + sql = (""" + SELECT event_id FROM stream_ordering_to_exterm + INNER JOIN ( + SELECT room_id, MAX(stream_ordering) AS stream_ordering + FROM stream_ordering_to_exterm + WHERE stream_ordering < ? GROUP BY room_id + ) AS rms USING (room_id, stream_ordering) + WHERE room_id = ? + """) + + def get_forward_extremeties_for_room_txn(txn): + txn.execute(sql, (stream_ordering, room_id)) + rows = txn.fetchall() + return [event_id for event_id, in rows] + + return self.runInteraction( + "get_forward_extremeties_for_room", + get_forward_extremeties_for_room_txn + ) + + def _delete_old_forward_extrem_cache(self): + def _delete_old_forward_extrem_cache_txn(txn): + txn.execute( + "DELETE FROM stream_ordering_to_exterm WHERE stream_ordering < ?", + (self.stream_ordering_month_ago,) + ) + return self.runInteraction( + "_delete_old_forward_extrem_cache", + _delete_old_forward_extrem_cache_txn + ) + def get_backfill_events(self, room_id, event_list, limit): """Get a list of Events for a given topic that occurred before (and including) the events in event_list. Return a list of max size `limit` diff --git a/synapse/storage/room.py b/synapse/storage/room.py index 8251f58670..2ef13d7403 100644 --- a/synapse/storage/room.py +++ b/synapse/storage/room.py @@ -48,15 +48,31 @@ class RoomStore(SQLBaseStore): StoreError if the room could not be stored. """ try: - yield self._simple_insert( - "rooms", - { - "room_id": room_id, - "creator": room_creator_user_id, - "is_public": is_public, - }, - desc="store_room", - ) + def store_room_txn(txn, next_id): + self._simple_insert_txn( + txn, + "rooms", + { + "room_id": room_id, + "creator": room_creator_user_id, + "is_public": is_public, + }, + ) + if is_public: + self._simple_insert_txn( + txn, + table="public_room_list_stream", + values={ + "stream_id": next_id, + "room_id": room_id, + "visibility": is_public, + } + ) + with self._public_room_id_gen.get_next() as next_id: + yield self.runInteraction( + "store_room_txn", + store_room_txn, next_id, + ) except Exception as e: logger.error("store_room with room_id=%s failed: %s", room_id, e) raise StoreError(500, "Problem creating room.") @@ -77,13 +93,45 @@ class RoomStore(SQLBaseStore): allow_none=True, ) + @defer.inlineCallbacks def set_room_is_public(self, room_id, is_public): - return self._simple_update_one( - table="rooms", - keyvalues={"room_id": room_id}, - updatevalues={"is_public": is_public}, - desc="set_room_is_public", - ) + def set_room_is_public_txn(txn, next_id): + self._simple_update_one_txn( + txn, + table="rooms", + keyvalues={"room_id": room_id}, + updatevalues={"is_public": is_public}, + ) + + entries = self._simple_select_list_txn( + txn, + table="public_room_list_stream", + keyvalues={"room_id": room_id}, + retcols=("stream_id", "visibility"), + ) + + entries.sort(key=lambda r: r["stream_id"]) + + add_to_stream = True + if entries: + add_to_stream = bool(entries[-1]["visibility"]) != is_public + + if add_to_stream: + self._simple_insert_txn( + txn, + table="public_room_list_stream", + values={ + "stream_id": next_id, + "room_id": room_id, + "visibility": is_public, + } + ) + + with self._public_room_id_gen.get_next() as next_id: + yield self.runInteraction( + "set_room_is_public", + set_room_is_public_txn, next_id, + ) def get_public_room_ids(self): return self._simple_select_onecol( @@ -207,3 +255,71 @@ class RoomStore(SQLBaseStore): }, desc="add_event_report" ) + + def get_current_public_room_stream_id(self): + return self._public_room_id_gen.get_current_token() + + def get_public_room_ids_at_stream_id(self, stream_id): + return self.runInteraction( + "get_public_room_ids_at_stream_id", + self.get_public_room_ids_at_stream_id_txn, stream_id + ) + + def get_public_room_ids_at_stream_id_txn(self, txn, stream_id): + return { + rm + for rm, vis in self.get_published_at_stream_id_txn(txn, stream_id).items() + if vis + } + + def get_published_at_stream_id_txn(self, txn, stream_id): + sql = (""" + SELECT room_id, visibility FROM public_room_list_stream + INNER JOIN ( + SELECT room_id, max(stream_id) AS stream_id + FROM public_room_list_stream + WHERE stream_id <= ? + GROUP BY room_id + ) grouped USING (room_id, stream_id) + """) + + txn.execute(sql, (stream_id,)) + return dict(txn.fetchall()) + + def get_public_room_changes(self, prev_stream_id, new_stream_id): + def get_public_room_changes_txn(txn): + then_rooms = self.get_public_room_ids_at_stream_id_txn(txn, prev_stream_id) + + now_rooms_dict = self.get_published_at_stream_id_txn(txn, new_stream_id) + + now_rooms_visible = set( + rm for rm, vis in now_rooms_dict.items() if vis + ) + now_rooms_not_visible = set( + rm for rm, vis in now_rooms_dict.items() if not vis + ) + + newly_visible = now_rooms_visible - then_rooms + newly_unpublished = now_rooms_not_visible & then_rooms + + return newly_visible, newly_unpublished + + return self.runInteraction( + "get_public_room_changes", get_public_room_changes_txn + ) + + def get_all_new_public_rooms(self, prev_id, current_id, limit): + def get_all_new_public_rooms(txn): + sql = (""" + SELECT stream_id, room_id, visibility FROM public_room_list_stream + WHERE stream_id > ? AND stream_id <= ? + ORDER BY stream_id ASC + LIMIT ? + """) + + txn.execute(sql, (prev_id, current_id, limit,)) + return txn.fetchall() + + return self.runInteraction( + "get_all_new_public_rooms", get_all_new_public_rooms + ) diff --git a/synapse/storage/schema/delta/35/public_room_list_change_stream.sql b/synapse/storage/schema/delta/35/public_room_list_change_stream.sql new file mode 100644 index 0000000000..dd2bf2e28a --- /dev/null +++ b/synapse/storage/schema/delta/35/public_room_list_change_stream.sql @@ -0,0 +1,33 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +CREATE TABLE public_room_list_stream ( + stream_id BIGINT NOT NULL, + room_id TEXT NOT NULL, + visibility BOOLEAN NOT NULL +); + +INSERT INTO public_room_list_stream (stream_id, room_id, visibility) + SELECT 1, room_id, is_public FROM rooms + WHERE is_public = CAST(1 AS BOOLEAN); + +CREATE INDEX public_room_list_stream_idx on public_room_list_stream( + stream_id +); + +CREATE INDEX public_room_list_stream_rm_idx on public_room_list_stream( + room_id, stream_id +); diff --git a/synapse/storage/schema/delta/35/stream_order_to_extrem.sql b/synapse/storage/schema/delta/35/stream_order_to_extrem.sql new file mode 100644 index 0000000000..2b945d8a57 --- /dev/null +++ b/synapse/storage/schema/delta/35/stream_order_to_extrem.sql @@ -0,0 +1,37 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +CREATE TABLE stream_ordering_to_exterm ( + stream_ordering BIGINT NOT NULL, + room_id TEXT NOT NULL, + event_id TEXT NOT NULL +); + +INSERT INTO stream_ordering_to_exterm (stream_ordering, room_id, event_id) + SELECT stream_ordering, room_id, event_id FROM event_forward_extremities + INNER JOIN ( + SELECT room_id, max(stream_ordering) as stream_ordering FROM events + INNER JOIN event_forward_extremities USING (room_id, event_id) + GROUP BY room_id + ) AS rms USING (room_id); + +CREATE INDEX stream_ordering_to_exterm_idx on stream_ordering_to_exterm( + stream_ordering +); + +CREATE INDEX stream_ordering_to_exterm_rm_idx on stream_ordering_to_exterm( + room_id, stream_ordering +); diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 0577a0525b..07ea969d4d 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -531,6 +531,9 @@ class StreamStore(SQLBaseStore): ) defer.returnValue("t%d-%d" % (topo, token)) + def get_room_max_stream_ordering(self): + return self._stream_id_gen.get_current_token() + def get_stream_token_for_event(self, event_id): """The stream token for an event Args: |