diff --git a/synapse/config/server.py b/synapse/config/server.py
index 51eaf423ce..ed5417d0c3 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -29,7 +29,6 @@ class ServerConfig(Config):
self.user_agent_suffix = config.get("user_agent_suffix")
self.use_frozen_dicts = config.get("use_frozen_dicts", False)
self.public_baseurl = config.get("public_baseurl")
- self.secondary_directory_servers = config.get("secondary_directory_servers", [])
if self.public_baseurl is not None:
if self.public_baseurl[-1] != '/':
@@ -142,14 +141,6 @@ class ServerConfig(Config):
# The GC threshold parameters to pass to `gc.set_threshold`, if defined
# gc_thresholds: [700, 10, 10]
- # A list of other Home Servers to fetch the public room directory from
- # and include in the public room directory of this home server
- # This is a temporary stopgap solution to populate new server with a
- # list of rooms until there exists a good solution of a decentralized
- # room directory.
- # secondary_directory_servers:
- # - matrix.org
-
# List of ports that Synapse should listen on, their purpose and their
# configuration.
listeners:
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 91bed4746f..f0a684fc13 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -24,7 +24,6 @@ from synapse.api.errors import (
CodeMessageException, HttpResponseException, SynapseError,
)
from synapse.util import unwrapFirstError
-from synapse.util.async import concurrently_execute
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.logutils import log_function
from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
@@ -719,24 +718,11 @@ class FederationClient(FederationBase):
raise RuntimeError("Failed to send to any server.")
- @defer.inlineCallbacks
- def get_public_rooms(self, destinations):
- results_by_server = {}
-
- @defer.inlineCallbacks
- def _get_result(s):
- if s == self.server_name:
- defer.returnValue()
-
- try:
- result = yield self.transport_layer.get_public_rooms(s)
- results_by_server[s] = result
- except:
- logger.exception("Error getting room list from server %r", s)
-
- yield concurrently_execute(_get_result, destinations, 3)
+ def get_public_rooms(self, destination, limit=None, since_token=None):
+ if destination == self.server_name:
+ return
- defer.returnValue(results_by_server)
+ return self.transport_layer.get_public_rooms(destination, limit, since_token)
@defer.inlineCallbacks
def query_auth(self, destination, room_id, event_id, local_auth):
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index 2b138526ba..f508b70f11 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -248,12 +248,19 @@ class TransportLayerClient(object):
@defer.inlineCallbacks
@log_function
- def get_public_rooms(self, remote_server):
+ def get_public_rooms(self, remote_server, limit, since_token):
path = PREFIX + "/publicRooms"
+ args = {}
+ if limit:
+ args["limit"] = [str(limit)]
+ if since_token:
+ args["since"] = [since_token]
+
response = yield self.client.get_json(
destination=remote_server,
path=path,
+ args=args,
)
defer.returnValue(response)
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 37c0d4fbc4..fec337be64 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -18,7 +18,9 @@ from twisted.internet import defer
from synapse.api.urls import FEDERATION_PREFIX as PREFIX
from synapse.api.errors import Codes, SynapseError
from synapse.http.server import JsonResource
-from synapse.http.servlet import parse_json_object_from_request
+from synapse.http.servlet import (
+ parse_json_object_from_request, parse_integer_from_args, parse_string_from_args,
+)
from synapse.util.ratelimitutils import FederationRateLimiter
from synapse.util.versionstring import get_version_string
@@ -554,7 +556,11 @@ class PublicRoomList(BaseFederationServlet):
@defer.inlineCallbacks
def on_GET(self, origin, content, query):
- data = yield self.room_list_handler.get_local_public_room_list()
+ limit = parse_integer_from_args(query, "limit", 0)
+ since_token = parse_string_from_args(query, "since", None)
+ data = yield self.room_list_handler.get_local_public_room_list(
+ limit, since_token
+ )
defer.returnValue((200, data))
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index d72e8c99f9..28bc35f8a3 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -18,13 +18,16 @@ from twisted.internet import defer
from ._base import BaseHandler
from synapse.api.constants import (
- EventTypes, JoinRules, Membership,
+ EventTypes, JoinRules,
)
-from synapse.api.errors import SynapseError
from synapse.util.async import concurrently_execute
from synapse.util.caches.response_cache import ResponseCache
+from collections import namedtuple
+from unpaddedbase64 import encode_base64, decode_base64
+
import logging
+import msgpack
logger = logging.getLogger(__name__)
@@ -35,28 +38,130 @@ class RoomListHandler(BaseHandler):
def __init__(self, hs):
super(RoomListHandler, self).__init__(hs)
self.response_cache = ResponseCache(hs)
- self.remote_list_request_cache = ResponseCache(hs)
- self.remote_list_cache = {}
- self.fetch_looping_call = hs.get_clock().looping_call(
- self.fetch_all_remote_lists, REMOTE_ROOM_LIST_POLL_INTERVAL
- )
- self.fetch_all_remote_lists()
- def get_local_public_room_list(self):
- result = self.response_cache.get(())
+ def get_local_public_room_list(self, limit=None, since_token=None):
+ result = self.response_cache.get((limit, since_token))
if not result:
- result = self.response_cache.set((), self._get_public_room_list())
+ result = self.response_cache.set(
+ (limit, since_token),
+ self._get_public_room_list(limit, since_token)
+ )
return result
@defer.inlineCallbacks
- def _get_public_room_list(self):
- room_ids = yield self.store.get_public_room_ids()
+ def _get_public_room_list(self, limit=None, since_token=None):
+ if since_token and since_token != "END":
+ since_token = RoomListNextBatch.from_token(since_token)
+ else:
+ since_token = None
+
+ rooms_to_order_value = {}
+ rooms_to_num_joined = {}
+ rooms_to_latest_event_ids = {}
+
+ newly_visible = []
+ newly_unpublished = []
+ if since_token:
+ stream_token = since_token.stream_ordering
+ current_public_id = yield self.store.get_current_public_room_stream_id()
+ public_room_stream_id = since_token.public_room_stream_id
+ newly_visible, newly_unpublished = yield self.store.get_public_room_changes(
+ public_room_stream_id, current_public_id
+ )
+ else:
+ stream_token = yield self.store.get_room_max_stream_ordering()
+ public_room_stream_id = yield self.store.get_current_public_room_stream_id()
+
+ room_ids = yield self.store.get_public_room_ids_at_stream_id(
+ public_room_stream_id
+ )
+
+ # We want to return rooms in a particular order: the number of joined
+ # users. We then arbitrarily use the room_id as a tie breaker.
+
+ @defer.inlineCallbacks
+ def get_order_for_room(room_id):
+ latest_event_ids = rooms_to_latest_event_ids.get(room_id, None)
+ if not latest_event_ids:
+ latest_event_ids = yield self.store.get_forward_extremeties_for_room(
+ room_id, stream_token
+ )
+ rooms_to_latest_event_ids[room_id] = latest_event_ids
+
+ if not latest_event_ids:
+ return
+
+ joined_users = yield self.state_handler.get_current_user_in_room(
+ room_id, latest_event_ids,
+ )
+ num_joined_users = len(joined_users)
+ rooms_to_num_joined[room_id] = num_joined_users
+
+ if num_joined_users == 0:
+ return
+
+ # We want larger rooms to be first, hence negating num_joined_users
+ rooms_to_order_value[room_id] = (-num_joined_users, room_id)
+
+ yield concurrently_execute(get_order_for_room, room_ids, 10)
+
+ sorted_entries = sorted(rooms_to_order_value.items(), key=lambda e: e[1])
+ sorted_rooms = [room_id for room_id, _ in sorted_entries]
+
+ if since_token:
+ if since_token.direction_is_forward:
+ sorted_rooms = sorted_rooms[since_token.current_limit:]
+ else:
+ sorted_rooms = sorted_rooms[:since_token.current_limit]
+ sorted_rooms.reverse()
- results = []
+ new_limit = None
+ if limit:
+ if sorted_rooms[limit:]:
+ new_limit = limit
+ if since_token:
+ if since_token.direction_is_forward:
+ new_limit += since_token.current_limit
+ else:
+ new_limit = since_token.current_limit - new_limit
+ new_limit = max(0, new_limit)
+ sorted_rooms = sorted_rooms[:limit]
+
+ chunk = []
@defer.inlineCallbacks
def handle_room(room_id):
- current_state = yield self.state_handler.get_current_state(room_id)
+ num_joined_users = rooms_to_num_joined[room_id]
+ if num_joined_users == 0:
+ return
+
+ if room_id in newly_unpublished:
+ return
+
+ result = {
+ "room_id": room_id,
+ "num_joined_members": num_joined_users,
+ }
+
+ current_state_ids = yield self.state_handler.get_current_state_ids(room_id)
+
+ event_map = yield self.store.get_events([
+ event_id for key, event_id in current_state_ids.items()
+ if key[0] in (
+ EventTypes.JoinRules,
+ EventTypes.Name,
+ EventTypes.Topic,
+ EventTypes.CanonicalAlias,
+ EventTypes.RoomHistoryVisibility,
+ EventTypes.GuestAccess,
+ "m.room.avatar",
+ )
+ ])
+
+ current_state = {
+ (ev.type, ev.state_key): ev
+ for ev in event_map.values()
+ }
# Double check that this is actually a public room.
join_rules_event = current_state.get((EventTypes.JoinRules, ""))
@@ -65,18 +170,6 @@ class RoomListHandler(BaseHandler):
if join_rule and join_rule != JoinRules.PUBLIC:
defer.returnValue(None)
- result = {"room_id": room_id}
-
- num_joined_users = len([
- 1 for _, event in current_state.items()
- if event.type == EventTypes.Member
- and event.membership == Membership.JOIN
- ])
- if num_joined_users == 0:
- return
-
- result["num_joined_members"] = num_joined_users
-
aliases = yield self.store.get_aliases_for_room(room_id)
if aliases:
result["aliases"] = aliases
@@ -117,68 +210,87 @@ class RoomListHandler(BaseHandler):
if avatar_url:
result["avatar_url"] = avatar_url
- results.append(result)
+ chunk.append(result)
- yield concurrently_execute(handle_room, room_ids, 10)
+ yield concurrently_execute(handle_room, sorted_rooms, 10)
- # FIXME (erikj): START is no longer a valid value
- defer.returnValue({"start": "START", "end": "END", "chunk": results})
+ chunk.sort(key=lambda e: (-e["num_joined_members"], e["room_id"]))
- @defer.inlineCallbacks
- def fetch_all_remote_lists(self):
- deferred = self.hs.get_replication_layer().get_public_rooms(
- self.hs.config.secondary_directory_servers
- )
- self.remote_list_request_cache.set((), deferred)
- self.remote_list_cache = yield deferred
+ results = {
+ "chunk": chunk,
+ }
+
+ if since_token:
+ results["new_rooms"] = bool(newly_visible)
+
+ if not since_token or since_token.direction_is_forward:
+ if new_limit:
+ results["next_batch"] = RoomListNextBatch(
+ stream_ordering=stream_token,
+ public_room_stream_id=public_room_stream_id,
+ current_limit=new_limit,
+ direction_is_forward=True,
+ ).to_token()
+
+ if since_token:
+ results["prev_batch"] = since_token.copy_and_replace(
+ direction_is_forward=False,
+ ).to_token()
+ else:
+ if new_limit:
+ results["prev_batch"] = RoomListNextBatch(
+ stream_ordering=stream_token,
+ public_room_stream_id=public_room_stream_id,
+ current_limit=new_limit,
+ direction_is_forward=False,
+ ).to_token()
+
+ if since_token:
+ results["next_batch"] = since_token.copy_and_replace(
+ direction_is_forward=True,
+ ).to_token()
+
+ defer.returnValue(results)
@defer.inlineCallbacks
- def get_remote_public_room_list(self, server_name):
+ def get_remote_public_room_list(self, server_name, limit=None, since_token=None):
res = yield self.hs.get_replication_layer().get_public_rooms(
- [server_name]
+ server_name, limit=limit, since_token=since_token,
)
- if server_name not in res:
- raise SynapseError(404, "Server not found")
- defer.returnValue(res[server_name])
-
- @defer.inlineCallbacks
- def get_aggregated_public_room_list(self):
- """
- Get the public room list from this server and the servers
- specified in the secondary_directory_servers config option.
- XXX: Pagination...
- """
- # We return the results from out cache which is updated by a looping call,
- # unless we're missing a cache entry, in which case wait for the result
- # of the fetch if there's one in progress. If not, omit that server.
- wait = False
- for s in self.hs.config.secondary_directory_servers:
- if s not in self.remote_list_cache:
- logger.warn("No cached room list from %s: waiting for fetch", s)
- wait = True
- break
-
- if wait and self.remote_list_request_cache.get(()):
- yield self.remote_list_request_cache.get(())
-
- public_rooms = yield self.get_local_public_room_list()
-
- # keep track of which room IDs we've seen so we can de-dup
- room_ids = set()
-
- # tag all the ones in our list with our server name.
- # Also add the them to the de-deping set
- for room in public_rooms['chunk']:
- room["server_name"] = self.hs.hostname
- room_ids.add(room["room_id"])
-
- # Now add the results from federation
- for server_name, server_result in self.remote_list_cache.items():
- for room in server_result["chunk"]:
- if room["room_id"] not in room_ids:
- room["server_name"] = server_name
- public_rooms["chunk"].append(room)
- room_ids.add(room["room_id"])
-
- defer.returnValue(public_rooms)
+ defer.returnValue(res)
+
+
+class RoomListNextBatch(namedtuple("RoomListNextBatch", (
+ "stream_ordering", # stream_ordering of the first public room list
+ "public_room_stream_id", # public room stream id for first public room list
+ "current_limit", # The number of previous rooms returned
+ "direction_is_forward", # Bool if this is a next_batch, false if prev_batch
+))):
+
+ KEY_DICT = {
+ "stream_ordering": "s",
+ "public_room_stream_id": "p",
+ "current_limit": "n",
+ "direction_is_forward": "d",
+ }
+
+ REVERSE_KEY_DICT = {v: k for k, v in KEY_DICT.items()}
+
+ @classmethod
+ def from_token(cls, token):
+ return RoomListNextBatch(**{
+ cls.REVERSE_KEY_DICT[key]: val
+ for key, val in msgpack.loads(decode_base64(token)).items()
+ })
+
+ def to_token(self):
+ return encode_base64(msgpack.dumps({
+ self.KEY_DICT[key]: val
+ for key, val in self._asdict().items()
+ }))
+
+ def copy_and_replace(self, **kwds):
+ return self._replace(
+ **kwds
+ )
diff --git a/synapse/http/servlet.py b/synapse/http/servlet.py
index e41afeab8e..9346386238 100644
--- a/synapse/http/servlet.py
+++ b/synapse/http/servlet.py
@@ -41,9 +41,13 @@ def parse_integer(request, name, default=None, required=False):
SynapseError: if the parameter is absent and required, or if the
parameter is present and not an integer.
"""
- if name in request.args:
+ return parse_integer_from_args(request.args, name, default, required)
+
+
+def parse_integer_from_args(args, name, default=None, required=False):
+ if name in args:
try:
- return int(request.args[name][0])
+ return int(args[name][0])
except:
message = "Query parameter %r must be an integer" % (name,)
raise SynapseError(400, message)
@@ -116,9 +120,15 @@ def parse_string(request, name, default=None, required=False,
parameter is present, must be one of a list of allowed values and
is not one of those allowed values.
"""
+ return parse_string_from_args(
+ request.args, name, default, required, allowed_values, param_type,
+ )
- if name in request.args:
- value = request.args[name][0]
+
+def parse_string_from_args(args, name, default=None, required=False,
+ allowed_values=None, param_type="string"):
+ if name in args:
+ value = args[name][0]
if allowed_values is not None and value not in allowed_values:
message = "Query parameter %r must be one of [%s]" % (
name, ", ".join(repr(v) for v in allowed_values)
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index 86e3d89154..b9e41770ee 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -36,6 +36,7 @@ REQUIREMENTS = {
"blist": ["blist"],
"pysaml2>=3.0.0,<4.0.0": ["saml2>=3.0.0,<4.0.0"],
"pymacaroons-pynacl": ["pymacaroons"],
+ "msgpack-python>=0.3.0": ["msgpack"],
}
CONDITIONAL_REQUIREMENTS = {
"web_client": {
diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py
index 299e9419a4..9aab3ce23c 100644
--- a/synapse/replication/resource.py
+++ b/synapse/replication/resource.py
@@ -42,6 +42,7 @@ STREAM_NAMES = (
("pushers",),
("caches",),
("to_device",),
+ ("public_rooms",),
)
@@ -131,6 +132,7 @@ class ReplicationResource(Resource):
push_rules_token, room_stream_token = self.store.get_push_rules_stream_token()
pushers_token = self.store.get_pushers_stream_token()
caches_token = self.store.get_cache_stream_token()
+ public_rooms_token = self.store.get_current_public_room_stream_id()
defer.returnValue(_ReplicationToken(
room_stream_token,
@@ -144,6 +146,7 @@ class ReplicationResource(Resource):
0, # State stream is no longer a thing
caches_token,
int(stream_token.to_device_key),
+ int(public_rooms_token),
))
@request_handler()
@@ -193,6 +196,7 @@ class ReplicationResource(Resource):
yield self.pushers(writer, current_token, limit, request_streams)
yield self.caches(writer, current_token, limit, request_streams)
yield self.to_device(writer, current_token, limit, request_streams)
+ yield self.public_rooms(writer, current_token, limit, request_streams)
self.streams(writer, current_token, request_streams)
logger.debug("Replicated %d rows", writer.total)
@@ -400,6 +404,20 @@ class ReplicationResource(Resource):
"position", "user_id", "device_id", "message_json"
))
+ @defer.inlineCallbacks
+ def public_rooms(self, writer, current_token, limit, request_streams):
+ current_position = current_token.public_rooms
+
+ public_rooms = request_streams.get("public_rooms")
+
+ if public_rooms is not None:
+ public_rooms_rows = yield self.store.get_all_new_public_rooms(
+ public_rooms, current_position, limit
+ )
+ writer.write_header_and_rows("public_rooms", public_rooms_rows, (
+ "position", "room_id", "visibility"
+ ))
+
class _Writer(object):
"""Writes the streams as a JSON object as the response to the request"""
@@ -428,7 +446,7 @@ class _Writer(object):
class _ReplicationToken(collections.namedtuple("_ReplicationToken", (
"events", "presence", "typing", "receipts", "account_data", "backfill",
- "push_rules", "pushers", "state", "caches", "to_device",
+ "push_rules", "pushers", "state", "caches", "to_device", "public_rooms",
))):
__slots__ = []
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index 15c52774a2..f8965c73a0 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -61,6 +61,8 @@ class SlavedEventStore(BaseSlavedStore):
"MembershipStreamChangeCache", events_max,
)
+ self.stream_ordering_month_ago = 0
+
# Cached functions can't be accessed through a class instance so we need
# to reach inside the __dict__ to extract them.
get_rooms_for_user = RoomMemberStore.__dict__["get_rooms_for_user"]
@@ -168,6 +170,12 @@ class SlavedEventStore(BaseSlavedStore):
get_auth_chain_ids = DataStore.get_auth_chain_ids.__func__
_get_auth_chain_ids_txn = DataStore._get_auth_chain_ids_txn.__func__
+ get_room_max_stream_ordering = DataStore.get_room_max_stream_ordering.__func__
+
+ get_forward_extremeties_for_room = (
+ DataStore.get_forward_extremeties_for_room.__func__
+ )
+
def stream_positions(self):
result = super(SlavedEventStore, self).stream_positions()
result["events"] = self._stream_id_gen.get_current_token()
diff --git a/synapse/replication/slave/storage/room.py b/synapse/replication/slave/storage/room.py
index d5bb0f98ea..81743941dc 100644
--- a/synapse/replication/slave/storage/room.py
+++ b/synapse/replication/slave/storage/room.py
@@ -15,7 +15,38 @@
from ._base import BaseSlavedStore
from synapse.storage import DataStore
+from ._slaved_id_tracker import SlavedIdTracker
class RoomStore(BaseSlavedStore):
+ def __init__(self, db_conn, hs):
+ super(RoomStore, self).__init__(db_conn, hs)
+ self._public_room_id_gen = SlavedIdTracker(
+ db_conn, "public_room_list_stream", "stream_id"
+ )
+
get_public_room_ids = DataStore.get_public_room_ids.__func__
+ get_current_public_room_stream_id = (
+ DataStore.get_current_public_room_stream_id.__func__
+ )
+ get_public_room_ids_at_stream_id = (
+ DataStore.get_public_room_ids_at_stream_id.__func__
+ )
+ get_public_room_ids_at_stream_id_txn = (
+ DataStore.get_public_room_ids_at_stream_id_txn.__func__
+ )
+ get_published_at_stream_id_txn = (
+ DataStore.get_published_at_stream_id_txn.__func__
+ )
+
+ def stream_positions(self):
+ result = super(RoomStore, self).stream_positions()
+ result["public_rooms"] = self._public_room_id_gen.get_current_token()
+ return result
+
+ def process_replication(self, result):
+ stream = result.get("public_rooms")
+ if stream:
+ self._public_room_id_gen.advance(int(stream["position"]))
+
+ return super(RoomStore, self).process_replication(result)
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index 22d6a7d31e..924c785358 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -23,7 +23,9 @@ from synapse.api.constants import EventTypes, Membership
from synapse.api.filtering import Filter
from synapse.types import UserID, RoomID, RoomAlias
from synapse.events.utils import serialize_event, format_event_for_client_v2
-from synapse.http.servlet import parse_json_object_from_request, parse_string
+from synapse.http.servlet import (
+ parse_json_object_from_request, parse_string, parse_integer
+)
import logging
import urllib
@@ -317,11 +319,21 @@ class PublicRoomListRestServlet(ClientV1RestServlet):
else:
pass
+ limit = parse_integer(request, "limit", 100)
+ since_token = parse_string(request, "since", None)
+
handler = self.hs.get_room_list_handler()
if server:
- data = yield handler.get_remote_public_room_list(server)
+ data = yield handler.get_remote_public_room_list(
+ server,
+ limit=limit,
+ since_token=since_token,
+ )
else:
- data = yield handler.get_aggregated_public_room_list()
+ data = yield handler.get_local_public_room_list(
+ limit=limit,
+ since_token=since_token,
+ )
defer.returnValue((200, data))
diff --git a/synapse/state.py b/synapse/state.py
index d89aca26b1..b4eca0e5d5 100644
--- a/synapse/state.py
+++ b/synapse/state.py
@@ -157,8 +157,9 @@ class StateHandler(object):
defer.returnValue(state)
@defer.inlineCallbacks
- def get_current_user_in_room(self, room_id):
- latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id)
+ def get_current_user_in_room(self, room_id, latest_event_ids=None):
+ if not latest_event_ids:
+ latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id)
entry = yield self.resolve_state_groups(room_id, latest_event_ids)
joined_users = yield self.store.get_joined_users_from_state(
room_id, entry.state_id, entry.state
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index a61e83d5de..0099a3f5bb 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -113,6 +113,9 @@ class DataStore(RoomMemberStore, RoomStore,
self._device_inbox_id_gen = StreamIdGenerator(
db_conn, "device_max_stream_id", "stream_id"
)
+ self._public_room_id_gen = StreamIdGenerator(
+ db_conn, "public_room_list_stream", "stream_id"
+ )
self._transaction_id_gen = IdGenerator(db_conn, "sent_transactions", "id")
self._state_groups_id_gen = IdGenerator(db_conn, "state_groups", "id")
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 0827946207..ec6dbe5492 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -16,6 +16,7 @@
from twisted.internet import defer
from ._base import SQLBaseStore
+from synapse.api.errors import StoreError
from synapse.util.caches.descriptors import cached
from unpaddedbase64 import encode_base64
@@ -36,6 +37,13 @@ class EventFederationStore(SQLBaseStore):
and backfilling from another server respectively.
"""
+ def __init__(self, hs):
+ super(EventFederationStore, self).__init__(hs)
+
+ hs.get_clock().looping_call(
+ self._delete_old_forward_extrem_cache, 60 * 60 * 1000
+ )
+
def get_auth_chain(self, event_ids):
return self.get_auth_chain_ids(event_ids).addCallback(self._get_events)
@@ -270,6 +278,37 @@ class EventFederationStore(SQLBaseStore):
]
)
+ # We now insert into stream_ordering_to_exterm a mapping from room_id,
+ # new stream_ordering to new forward extremeties in the room.
+ # This allows us to later efficiently look up the forward extremeties
+ # for a room before a given stream_ordering
+ max_stream_ord = max(
+ ev.internal_metadata.stream_ordering for ev in events
+ )
+ new_extrem = {}
+ for room_id in events_by_room:
+ event_ids = self._simple_select_onecol_txn(
+ txn,
+ table="event_forward_extremities",
+ keyvalues={"room_id": room_id},
+ retcol="event_id",
+ )
+ new_extrem[room_id] = event_ids
+
+ self._simple_insert_many_txn(
+ txn,
+ table="stream_ordering_to_exterm",
+ values=[
+ {
+ "room_id": room_id,
+ "event_id": event_id,
+ "stream_ordering": max_stream_ord,
+ }
+ for room_id, extrem_evs in new_extrem.items()
+ for event_id in extrem_evs
+ ]
+ )
+
query = (
"INSERT INTO event_backward_extremities (event_id, room_id)"
" SELECT ?, ? WHERE NOT EXISTS ("
@@ -305,6 +344,48 @@ class EventFederationStore(SQLBaseStore):
self.get_latest_event_ids_in_room.invalidate, (room_id,)
)
+ def get_forward_extremeties_for_room(self, room_id, stream_ordering):
+ """For a given room_id and stream_ordering, return the forward
+ extremeties of the room at that point in "time".
+
+ Throws a StoreError if we have since purged the index for
+ stream_orderings from that point.
+ """
+
+ if stream_ordering <= self.stream_ordering_month_ago:
+ raise StoreError(400, "stream_ordering too old")
+
+ sql = ("""
+ SELECT event_id FROM stream_ordering_to_exterm
+ INNER JOIN (
+ SELECT room_id, MAX(stream_ordering) AS stream_ordering
+ FROM stream_ordering_to_exterm
+ WHERE stream_ordering < ? GROUP BY room_id
+ ) AS rms USING (room_id, stream_ordering)
+ WHERE room_id = ?
+ """)
+
+ def get_forward_extremeties_for_room_txn(txn):
+ txn.execute(sql, (stream_ordering, room_id))
+ rows = txn.fetchall()
+ return [event_id for event_id, in rows]
+
+ return self.runInteraction(
+ "get_forward_extremeties_for_room",
+ get_forward_extremeties_for_room_txn
+ )
+
+ def _delete_old_forward_extrem_cache(self):
+ def _delete_old_forward_extrem_cache_txn(txn):
+ txn.execute(
+ "DELETE FROM stream_ordering_to_exterm WHERE stream_ordering < ?",
+ (self.stream_ordering_month_ago,)
+ )
+ return self.runInteraction(
+ "_delete_old_forward_extrem_cache",
+ _delete_old_forward_extrem_cache_txn
+ )
+
def get_backfill_events(self, room_id, event_list, limit):
"""Get a list of Events for a given topic that occurred before (and
including) the events in event_list. Return a list of max size `limit`
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index 8251f58670..2ef13d7403 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -48,15 +48,31 @@ class RoomStore(SQLBaseStore):
StoreError if the room could not be stored.
"""
try:
- yield self._simple_insert(
- "rooms",
- {
- "room_id": room_id,
- "creator": room_creator_user_id,
- "is_public": is_public,
- },
- desc="store_room",
- )
+ def store_room_txn(txn, next_id):
+ self._simple_insert_txn(
+ txn,
+ "rooms",
+ {
+ "room_id": room_id,
+ "creator": room_creator_user_id,
+ "is_public": is_public,
+ },
+ )
+ if is_public:
+ self._simple_insert_txn(
+ txn,
+ table="public_room_list_stream",
+ values={
+ "stream_id": next_id,
+ "room_id": room_id,
+ "visibility": is_public,
+ }
+ )
+ with self._public_room_id_gen.get_next() as next_id:
+ yield self.runInteraction(
+ "store_room_txn",
+ store_room_txn, next_id,
+ )
except Exception as e:
logger.error("store_room with room_id=%s failed: %s", room_id, e)
raise StoreError(500, "Problem creating room.")
@@ -77,13 +93,45 @@ class RoomStore(SQLBaseStore):
allow_none=True,
)
+ @defer.inlineCallbacks
def set_room_is_public(self, room_id, is_public):
- return self._simple_update_one(
- table="rooms",
- keyvalues={"room_id": room_id},
- updatevalues={"is_public": is_public},
- desc="set_room_is_public",
- )
+ def set_room_is_public_txn(txn, next_id):
+ self._simple_update_one_txn(
+ txn,
+ table="rooms",
+ keyvalues={"room_id": room_id},
+ updatevalues={"is_public": is_public},
+ )
+
+ entries = self._simple_select_list_txn(
+ txn,
+ table="public_room_list_stream",
+ keyvalues={"room_id": room_id},
+ retcols=("stream_id", "visibility"),
+ )
+
+ entries.sort(key=lambda r: r["stream_id"])
+
+ add_to_stream = True
+ if entries:
+ add_to_stream = bool(entries[-1]["visibility"]) != is_public
+
+ if add_to_stream:
+ self._simple_insert_txn(
+ txn,
+ table="public_room_list_stream",
+ values={
+ "stream_id": next_id,
+ "room_id": room_id,
+ "visibility": is_public,
+ }
+ )
+
+ with self._public_room_id_gen.get_next() as next_id:
+ yield self.runInteraction(
+ "set_room_is_public",
+ set_room_is_public_txn, next_id,
+ )
def get_public_room_ids(self):
return self._simple_select_onecol(
@@ -207,3 +255,71 @@ class RoomStore(SQLBaseStore):
},
desc="add_event_report"
)
+
+ def get_current_public_room_stream_id(self):
+ return self._public_room_id_gen.get_current_token()
+
+ def get_public_room_ids_at_stream_id(self, stream_id):
+ return self.runInteraction(
+ "get_public_room_ids_at_stream_id",
+ self.get_public_room_ids_at_stream_id_txn, stream_id
+ )
+
+ def get_public_room_ids_at_stream_id_txn(self, txn, stream_id):
+ return {
+ rm
+ for rm, vis in self.get_published_at_stream_id_txn(txn, stream_id).items()
+ if vis
+ }
+
+ def get_published_at_stream_id_txn(self, txn, stream_id):
+ sql = ("""
+ SELECT room_id, visibility FROM public_room_list_stream
+ INNER JOIN (
+ SELECT room_id, max(stream_id) AS stream_id
+ FROM public_room_list_stream
+ WHERE stream_id <= ?
+ GROUP BY room_id
+ ) grouped USING (room_id, stream_id)
+ """)
+
+ txn.execute(sql, (stream_id,))
+ return dict(txn.fetchall())
+
+ def get_public_room_changes(self, prev_stream_id, new_stream_id):
+ def get_public_room_changes_txn(txn):
+ then_rooms = self.get_public_room_ids_at_stream_id_txn(txn, prev_stream_id)
+
+ now_rooms_dict = self.get_published_at_stream_id_txn(txn, new_stream_id)
+
+ now_rooms_visible = set(
+ rm for rm, vis in now_rooms_dict.items() if vis
+ )
+ now_rooms_not_visible = set(
+ rm for rm, vis in now_rooms_dict.items() if not vis
+ )
+
+ newly_visible = now_rooms_visible - then_rooms
+ newly_unpublished = now_rooms_not_visible & then_rooms
+
+ return newly_visible, newly_unpublished
+
+ return self.runInteraction(
+ "get_public_room_changes", get_public_room_changes_txn
+ )
+
+ def get_all_new_public_rooms(self, prev_id, current_id, limit):
+ def get_all_new_public_rooms(txn):
+ sql = ("""
+ SELECT stream_id, room_id, visibility FROM public_room_list_stream
+ WHERE stream_id > ? AND stream_id <= ?
+ ORDER BY stream_id ASC
+ LIMIT ?
+ """)
+
+ txn.execute(sql, (prev_id, current_id, limit,))
+ return txn.fetchall()
+
+ return self.runInteraction(
+ "get_all_new_public_rooms", get_all_new_public_rooms
+ )
diff --git a/synapse/storage/schema/delta/35/public_room_list_change_stream.sql b/synapse/storage/schema/delta/35/public_room_list_change_stream.sql
new file mode 100644
index 0000000000..dd2bf2e28a
--- /dev/null
+++ b/synapse/storage/schema/delta/35/public_room_list_change_stream.sql
@@ -0,0 +1,33 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+CREATE TABLE public_room_list_stream (
+ stream_id BIGINT NOT NULL,
+ room_id TEXT NOT NULL,
+ visibility BOOLEAN NOT NULL
+);
+
+INSERT INTO public_room_list_stream (stream_id, room_id, visibility)
+ SELECT 1, room_id, is_public FROM rooms
+ WHERE is_public = CAST(1 AS BOOLEAN);
+
+CREATE INDEX public_room_list_stream_idx on public_room_list_stream(
+ stream_id
+);
+
+CREATE INDEX public_room_list_stream_rm_idx on public_room_list_stream(
+ room_id, stream_id
+);
diff --git a/synapse/storage/schema/delta/35/stream_order_to_extrem.sql b/synapse/storage/schema/delta/35/stream_order_to_extrem.sql
new file mode 100644
index 0000000000..2b945d8a57
--- /dev/null
+++ b/synapse/storage/schema/delta/35/stream_order_to_extrem.sql
@@ -0,0 +1,37 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+CREATE TABLE stream_ordering_to_exterm (
+ stream_ordering BIGINT NOT NULL,
+ room_id TEXT NOT NULL,
+ event_id TEXT NOT NULL
+);
+
+INSERT INTO stream_ordering_to_exterm (stream_ordering, room_id, event_id)
+ SELECT stream_ordering, room_id, event_id FROM event_forward_extremities
+ INNER JOIN (
+ SELECT room_id, max(stream_ordering) as stream_ordering FROM events
+ INNER JOIN event_forward_extremities USING (room_id, event_id)
+ GROUP BY room_id
+ ) AS rms USING (room_id);
+
+CREATE INDEX stream_ordering_to_exterm_idx on stream_ordering_to_exterm(
+ stream_ordering
+);
+
+CREATE INDEX stream_ordering_to_exterm_rm_idx on stream_ordering_to_exterm(
+ room_id, stream_ordering
+);
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 0577a0525b..07ea969d4d 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -531,6 +531,9 @@ class StreamStore(SQLBaseStore):
)
defer.returnValue("t%d-%d" % (topo, token))
+ def get_room_max_stream_ordering(self):
+ return self._stream_id_gen.get_current_token()
+
def get_stream_token_for_event(self, event_id):
"""The stream token for an event
Args:
|