diff options
-rw-r--r-- | README.rst | 7 | ||||
-rwxr-xr-x | synapse/app/homeserver.py | 4 | ||||
-rwxr-xr-x | synapse/app/synctl.py | 3 | ||||
-rw-r--r-- | synapse/config/server.py | 6 | ||||
-rw-r--r-- | synapse/events/snapshot.py | 3 | ||||
-rw-r--r-- | synapse/events/utils.py | 23 | ||||
-rw-r--r-- | synapse/handlers/federation.py | 41 | ||||
-rw-r--r-- | synapse/handlers/message.py | 19 | ||||
-rw-r--r-- | synapse/handlers/room_member.py | 1 | ||||
-rw-r--r-- | synapse/push/bulk_push_rule_evaluator.py | 27 | ||||
-rw-r--r-- | synapse/rest/client/v1/directory.py | 6 | ||||
-rw-r--r-- | synapse/rest/client/v1/room.py | 1 | ||||
-rw-r--r-- | synapse/rest/client/v2_alpha/sync.py | 1 | ||||
-rw-r--r-- | synapse/storage/_base.py | 8 | ||||
-rw-r--r-- | synapse/storage/account_data.py | 13 | ||||
-rw-r--r-- | synapse/storage/background_updates.py | 83 | ||||
-rw-r--r-- | synapse/storage/events.py | 32 | ||||
-rw-r--r-- | synapse/storage/push_rule.py | 5 | ||||
-rw-r--r-- | synapse/storage/state.py | 17 | ||||
-rw-r--r-- | synapse/util/__init__.py | 2 | ||||
-rw-r--r-- | synapse/visibility.py | 19 | ||||
-rw-r--r-- | tests/storage/event_injector.py | 4 | ||||
-rw-r--r-- | tests/storage/test_events.py | 2 |
23 files changed, 213 insertions, 114 deletions
diff --git a/README.rst b/README.rst index d090ad86a2..9503ef510a 100644 --- a/README.rst +++ b/README.rst @@ -354,8 +354,11 @@ ArchLinux --------- The quickest way to get up and running with ArchLinux is probably with the community package -https://www.archlinux.org/packages/community/any/matrix-synapse/, which should pull in all -the necessary dependencies. +https://www.archlinux.org/packages/community/any/matrix-synapse/, which should pull in most of +the necessary dependencies. If the default web client is to be served (enabled by default in +the generated config), +https://www.archlinux.org/packages/community/any/python2-matrix-angular-sdk/ will also need to +be installed. Alternatively, to install using pip a few changes may be needed as ArchLinux defaults to python 3, but synapse currently assumes python 2.7 by default: diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 6f5924d2c7..3457402596 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -25,7 +25,7 @@ import synapse.config.logger from synapse.config._base import ConfigError from synapse.python_dependencies import ( - check_requirements, DEPENDENCY_LINKS + check_requirements, CONDITIONAL_REQUIREMENTS ) from synapse.rest import ClientRestResource @@ -92,7 +92,7 @@ def build_resource_for_web_client(hs): "\n" "You can also disable hosting of the webclient via the\n" "configuration option `web_client`\n" - % {"dep": DEPENDENCY_LINKS["matrix-angular-sdk"]} + % {"dep": CONDITIONAL_REQUIREMENTS["web_client"].keys()[0]} ) syweb_path = os.path.dirname(syweb.__file__) webclient_path = os.path.join(syweb_path, "webclient") diff --git a/synapse/app/synctl.py b/synapse/app/synctl.py index 8223734845..3bd7ef7bba 100755 --- a/synapse/app/synctl.py +++ b/synapse/app/synctl.py @@ -234,6 +234,9 @@ def main(): if action == "start" or action == "restart": if start_stop_synapse: + # Check if synapse is already running + if os.path.exists(pidfile) and pid_running(int(open(pidfile).read())): + abort("synapse.app.homeserver already running") start(configfile) for worker in workers: diff --git a/synapse/config/server.py b/synapse/config/server.py index 1f9999d57a..25e6666238 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -144,6 +144,12 @@ class ServerConfig(Config): # Whether to serve a web client from the HTTP/HTTPS root resource. web_client: True + # The root directory to server for the above web client. + # If left undefined, synapse will serve the matrix-angular-sdk web client. + # Make sure matrix-angular-sdk is installed with pip if web_client is True + # and web_client_location is undefined + # web_client_location: "/path/to/web/root" + # The public-facing base URL for the client API (not including _matrix/...) # public_baseurl: https://example.com:8448/ diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index 6be18880b9..e9a732ff03 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -50,6 +50,7 @@ class EventContext(object): "prev_group", "delta_ids", "prev_state_events", + "app_service", ] def __init__(self): @@ -68,3 +69,5 @@ class EventContext(object): self.delta_ids = None self.prev_state_events = None + + self.app_service = None diff --git a/synapse/events/utils.py b/synapse/events/utils.py index 5bbaef8187..824f4a42e3 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -225,7 +225,22 @@ def format_event_for_client_v2_without_room_id(d): def serialize_event(e, time_now_ms, as_client_event=True, event_format=format_event_for_client_v1, - token_id=None, only_event_fields=None): + token_id=None, only_event_fields=None, is_invite=False): + """Serialize event for clients + + Args: + e (EventBase) + time_now_ms (int) + as_client_event (bool) + event_format + token_id + only_event_fields + is_invite (bool): Whether this is an invite that is being sent to the + invitee + + Returns: + dict + """ # FIXME(erikj): To handle the case of presence events and the like if not isinstance(e, EventBase): return e @@ -251,6 +266,12 @@ def serialize_event(e, time_now_ms, as_client_event=True, if txn_id is not None: d["unsigned"]["transaction_id"] = txn_id + # If this is an invite for somebody else, then we don't care about the + # invite_room_state as that's meant solely for the invitee. Other clients + # will already have the state since they're in the room. + if not is_invite: + d["unsigned"].pop("invite_room_state", None) + if as_client_event: d = event_format(d) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 52be5a402d..2af9849ed0 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -172,8 +172,22 @@ class FederationHandler(BaseHandler): origin, pdu, prevs, min_depth ) - prevs = {e_id for e_id, _ in pdu.prev_events} - seen = set(have_seen.keys()) + # Update the set of things we've seen after trying to + # fetch the missing stuff + have_seen = yield self.store.have_events(prevs) + seen = set(have_seen.iterkeys()) + + if not prevs - seen: + logger.info( + "Found all missing prev events for %s", pdu.event_id + ) + elif prevs - seen: + logger.info( + "Not fetching %d missing events for room %r,event %s: %r...", + len(prevs - seen), pdu.room_id, pdu.event_id, + list(prevs - seen)[:5], + ) + if prevs - seen: logger.info( "Still missing %d events for room %r: %r...", @@ -208,19 +222,15 @@ class FederationHandler(BaseHandler): Args: origin (str): Origin of the pdu. Will be called to get the missing events pdu: received pdu - prevs (str[]): List of event ids which we are missing + prevs (set(str)): List of event ids which we are missing min_depth (int): Minimum depth of events to return. - - Returns: - Deferred<dict(str, str?)>: updated have_seen dictionary """ # We recalculate seen, since it may have changed. have_seen = yield self.store.have_events(prevs) seen = set(have_seen.keys()) if not prevs - seen: - # nothing left to do - defer.returnValue(have_seen) + return latest = yield self.store.get_latest_event_ids_in_room( pdu.room_id @@ -232,8 +242,8 @@ class FederationHandler(BaseHandler): latest |= seen logger.info( - "Missing %d events for room %r: %r...", - len(prevs - seen), pdu.room_id, list(prevs - seen)[:5] + "Missing %d events for room %r pdu %s: %r...", + len(prevs - seen), pdu.room_id, pdu.event_id, list(prevs - seen)[:5] ) # XXX: we set timeout to 10s to help workaround @@ -265,22 +275,23 @@ class FederationHandler(BaseHandler): timeout=10000, ) + logger.info( + "Got %d events: %r...", + len(missing_events), [e.event_id for e in missing_events[:5]] + ) + # We want to sort these by depth so we process them and # tell clients about them in order. missing_events.sort(key=lambda x: x.depth) for e in missing_events: + logger.info("Handling found event %s", e.event_id) yield self.on_receive_pdu( origin, e, get_missing=False ) - have_seen = yield self.store.have_events( - [ev for ev, _ in pdu.prev_events] - ) - defer.returnValue(have_seen) - @log_function @defer.inlineCallbacks def _process_received_pdu(self, origin, pdu, state, auth_chain): diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 348056add5..57265c6d7d 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -175,7 +175,8 @@ class MessageHandler(BaseHandler): defer.returnValue(chunk) @defer.inlineCallbacks - def create_event(self, event_dict, token_id=None, txn_id=None, prev_event_ids=None): + def create_event(self, requester, event_dict, token_id=None, txn_id=None, + prev_event_ids=None): """ Given a dict from a client, create a new event. @@ -185,6 +186,7 @@ class MessageHandler(BaseHandler): Adds display names to Join membership events. Args: + requester event_dict (dict): An entire event token_id (str) txn_id (str) @@ -226,6 +228,7 @@ class MessageHandler(BaseHandler): event, context = yield self._create_new_client_event( builder=builder, + requester=requester, prev_event_ids=prev_event_ids, ) @@ -319,6 +322,7 @@ class MessageHandler(BaseHandler): See self.create_event and self.send_nonmember_event. """ event, context = yield self.create_event( + requester, event_dict, token_id=requester.access_token_id, txn_id=txn_id @@ -416,7 +420,7 @@ class MessageHandler(BaseHandler): @measure_func("_create_new_client_event") @defer.inlineCallbacks - def _create_new_client_event(self, builder, prev_event_ids=None): + def _create_new_client_event(self, builder, requester=None, prev_event_ids=None): if prev_event_ids: prev_events = yield self.store.add_event_hashes(prev_event_ids) prev_max_depth = yield self.store.get_max_depth_of_events(prev_event_ids) @@ -456,6 +460,8 @@ class MessageHandler(BaseHandler): state_handler = self.state_handler context = yield state_handler.compute_event_context(builder) + if requester: + context.app_service = requester.app_service if builder.is_state(): builder.prev_state = yield self.store.add_event_hashes( @@ -531,9 +537,9 @@ class MessageHandler(BaseHandler): state_to_include_ids = [ e_id - for k, e_id in context.current_state_ids.items() + for k, e_id in context.current_state_ids.iteritems() if k[0] in self.hs.config.room_invite_state_types - or k[0] == EventTypes.Member and k[1] == event.sender + or k == (EventTypes.Member, event.sender) ] state_to_include = yield self.store.get_events(state_to_include_ids) @@ -545,7 +551,7 @@ class MessageHandler(BaseHandler): "content": e.content, "sender": e.sender, } - for e in state_to_include.values() + for e in state_to_include.itervalues() ] invitee = UserID.from_string(event.state_key) @@ -618,6 +624,3 @@ class MessageHandler(BaseHandler): ) preserve_fn(_notify)() - - # If invite, remove room_state from unsigned before sending. - event.unsigned.pop("invite_room_state", None) diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 28b2c80a93..ab87632d99 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -70,6 +70,7 @@ class RoomMemberHandler(BaseHandler): content["kind"] = "guest" event, context = yield msg_handler.create_event( + requester, { "type": EventTypes.Member, "content": content, diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index f943ff640f..cb13874ccf 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -20,7 +20,6 @@ from twisted.internet import defer from .push_rule_evaluator import PushRuleEvaluatorForEvent from synapse.api.constants import EventTypes -from synapse.visibility import filter_events_for_clients_context logger = logging.getLogger(__name__) @@ -67,17 +66,6 @@ class BulkPushRuleEvaluator: def action_for_event_by_user(self, event, context): actions_by_user = {} - # None of these users can be peeking since this list of users comes - # from the set of users in the room, so we know for sure they're all - # actually in the room. - user_tuples = [ - (u, False) for u in self.rules_by_user.keys() - ] - - filtered_by_user = yield filter_events_for_clients_context( - self.store, user_tuples, [event], {event.event_id: context} - ) - room_members = yield self.store.get_joined_users_from_context( event, context ) @@ -87,6 +75,14 @@ class BulkPushRuleEvaluator: condition_cache = {} for uid, rules in self.rules_by_user.items(): + if event.sender == uid: + continue + + if not event.is_state(): + is_ignored = yield self.store.is_ignored_by(event.sender, uid) + if is_ignored: + continue + display_name = None profile_info = room_members.get(uid) if profile_info: @@ -98,13 +94,6 @@ class BulkPushRuleEvaluator: if event.type == EventTypes.Member and event.state_key == uid: display_name = event.content.get("displayname", None) - filtered = filtered_by_user[uid] - if len(filtered) == 0: - continue - - if filtered[0].sender == uid: - continue - for rule in rules: if 'enabled' in rule and not rule['enabled']: continue diff --git a/synapse/rest/client/v1/directory.py b/synapse/rest/client/v1/directory.py index 8930f1826f..f15aa5c13f 100644 --- a/synapse/rest/client/v1/directory.py +++ b/synapse/rest/client/v1/directory.py @@ -39,6 +39,7 @@ class ClientDirectoryServer(ClientV1RestServlet): def __init__(self, hs): super(ClientDirectoryServer, self).__init__(hs) + self.store = hs.get_datastore() self.handlers = hs.get_handlers() @defer.inlineCallbacks @@ -70,7 +71,10 @@ class ClientDirectoryServer(ClientV1RestServlet): logger.debug("Got servers: %s", servers) # TODO(erikj): Check types. - # TODO(erikj): Check that room exists + + room = yield self.store.get_room(room_id) + if room is None: + raise SynapseError(400, "Room does not exist") dir_handler = self.handlers.directory_handler diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index c376ab8fd7..cd388770c8 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -164,6 +164,7 @@ class RoomStateEventRestServlet(ClientV1RestServlet): else: msg_handler = self.handlers.message_handler event, context = yield msg_handler.create_event( + requester, event_dict, token_id=requester.access_token_id, txn_id=txn_id, diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index a7a9e0a794..f30eab76fd 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -253,6 +253,7 @@ class SyncRestServlet(RestServlet): invite = serialize_event( room.invite, time_now, token_id=token_id, event_format=format_event_for_client_v2_without_room_id, + is_invite=True, ) unsigned = dict(invite.get("unsigned", {})) invite["unsigned"] = unsigned diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index c659004e8d..58b73af7d2 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -60,12 +60,12 @@ class LoggingTransaction(object): object.__setattr__(self, "database_engine", database_engine) object.__setattr__(self, "after_callbacks", after_callbacks) - def call_after(self, callback, *args): + def call_after(self, callback, *args, **kwargs): """Call the given callback on the main twisted thread after the transaction has finished. Used to invalidate the caches on the correct thread. """ - self.after_callbacks.append((callback, args)) + self.after_callbacks.append((callback, args, kwargs)) def __getattr__(self, name): return getattr(self.txn, name) @@ -319,8 +319,8 @@ class SQLBaseStore(object): inner_func, *args, **kwargs ) finally: - for after_callback, after_args in after_callbacks: - after_callback(*after_args) + for after_callback, after_args, after_kwargs in after_callbacks: + after_callback(*after_args, **after_kwargs) defer.returnValue(result) @defer.inlineCallbacks diff --git a/synapse/storage/account_data.py b/synapse/storage/account_data.py index aa84ffc2b0..ff14e54c11 100644 --- a/synapse/storage/account_data.py +++ b/synapse/storage/account_data.py @@ -308,3 +308,16 @@ class AccountDataStore(SQLBaseStore): " WHERE stream_id < ?" ) txn.execute(update_max_id_sql, (next_id, next_id)) + + @cachedInlineCallbacks(num_args=2, cache_context=True, max_entries=5000) + def is_ignored_by(self, ignored_user_id, ignorer_user_id, cache_context): + ignored_account_data = yield self.get_global_account_data_by_type_for_user( + "m.ignored_user_list", ignorer_user_id, + on_invalidate=cache_context.invalidate, + ) + if not ignored_account_data: + defer.returnValue(False) + + defer.returnValue( + ignored_user_id in ignored_account_data.get("ignored_users", {}) + ) diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index 813ad59e56..d4cf0fc59b 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -228,46 +228,69 @@ class BackgroundUpdateStore(SQLBaseStore): columns (list[str]): columns/expressions to include in index """ - # if this is postgres, we add the indexes concurrently. Otherwise - # we fall back to doing it inline - if isinstance(self.database_engine, engines.PostgresEngine): - conc = True - else: - conc = False - # We don't use partial indices on SQLite as it wasn't introduced - # until 3.8, and wheezy has 3.7 - where_clause = None - - sql = ( - "CREATE INDEX %(conc)s %(name)s ON %(table)s (%(columns)s)" - " %(where_clause)s" - ) % { - "conc": "CONCURRENTLY" if conc else "", - "name": index_name, - "table": table, - "columns": ", ".join(columns), - "where_clause": "WHERE " + where_clause if where_clause else "" - } - - def create_index_concurrently(conn): + def create_index_psql(conn): conn.rollback() # postgres insists on autocommit for the index conn.set_session(autocommit=True) - c = conn.cursor() - c.execute(sql) - conn.set_session(autocommit=False) - def create_index(conn): + try: + c = conn.cursor() + + # If a previous attempt to create the index was interrupted, + # we may already have a half-built index. Let's just drop it + # before trying to create it again. + + sql = "DROP INDEX IF EXISTS %s" % (index_name,) + logger.debug("[SQL] %s", sql) + c.execute(sql) + + sql = ( + "CREATE INDEX CONCURRENTLY %(name)s ON %(table)s" + " (%(columns)s) %(where_clause)s" + ) % { + "name": index_name, + "table": table, + "columns": ", ".join(columns), + "where_clause": "WHERE " + where_clause if where_clause else "" + } + logger.debug("[SQL] %s", sql) + c.execute(sql) + finally: + conn.set_session(autocommit=False) + + def create_index_sqlite(conn): + # Sqlite doesn't support concurrent creation of indexes. + # + # We don't use partial indices on SQLite as it wasn't introduced + # until 3.8, and wheezy has 3.7 + # + # We assume that sqlite doesn't give us invalid indices; however + # we may still end up with the index existing but the + # background_updates not having been recorded if synapse got shut + # down at the wrong moment - hance we use IF NOT EXISTS. (SQLite + # has supported CREATE TABLE|INDEX IF NOT EXISTS since 3.3.0.) + sql = ( + "CREATE INDEX IF NOT EXISTS %(name)s ON %(table)s" + " (%(columns)s)" + ) % { + "name": index_name, + "table": table, + "columns": ", ".join(columns), + } + c = conn.cursor() + logger.debug("[SQL] %s", sql) c.execute(sql) + if isinstance(self.database_engine, engines.PostgresEngine): + runner = create_index_psql + else: + runner = create_index_sqlite + @defer.inlineCallbacks def updater(progress, batch_size): logger.info("Adding index %s to %s", index_name, table) - if conc: - yield self.runWithConnection(create_index_concurrently) - else: - yield self.runWithConnection(create_index) + yield self.runWithConnection(runner) yield self._end_background_update(update_name) defer.returnValue(1) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index a3790419dd..d946024c9b 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -29,6 +29,7 @@ from synapse.api.constants import EventTypes from synapse.api.errors import SynapseError from synapse.state import resolve_events from synapse.util.caches.descriptors import cached +from synapse.types import get_domain_from_id from canonicaljson import encode_canonical_json from collections import deque, namedtuple, OrderedDict @@ -49,6 +50,9 @@ logger = logging.getLogger(__name__) metrics = synapse.metrics.get_metrics_for(__name__) persist_event_counter = metrics.register_counter("persisted_events") +event_counter = metrics.register_counter( + "persisted_events_sep", labels=["type", "origin_type", "origin_entity"] +) def encode_json(json_object): @@ -371,6 +375,24 @@ class EventsStore(SQLBaseStore): ) persist_event_counter.inc_by(len(chunk)) + for room_id, (_, _, new_state) in current_state_for_room.iteritems(): + self.get_current_state_ids.prefill( + (room_id, ), new_state + ) + + for event, context in chunk: + if context.app_service: + origin_type = "local" + origin_entity = context.app_service.id + elif self.hs.is_mine_id(event.sender): + origin_type = "local" + origin_entity = "*client*" + else: + origin_type = "remote" + origin_entity = get_domain_from_id(event.sender) + + event_counter.inc(event.type, origin_type, origin_entity) + @defer.inlineCallbacks def _calculate_new_extremeties(self, room_id, event_contexts, latest_event_ids): """Calculates the new forward extremeties for a room given events to @@ -419,10 +441,10 @@ class EventsStore(SQLBaseStore): Assumes that we are only persisting events for one room at a time. Returns: - 2-tuple (to_delete, to_insert) where both are state dicts, i.e. - (type, state_key) -> event_id. `to_delete` are the entries to + 3-tuple (to_delete, to_insert, new_state) where both are state dicts, + i.e. (type, state_key) -> event_id. `to_delete` are the entries to first be deleted from current_state_events, `to_insert` are entries - to insert. + to insert. `new_state` is the full set of state. May return None if there are no changes to be applied. """ # Now we need to work out the different state sets for @@ -529,7 +551,7 @@ class EventsStore(SQLBaseStore): if ev_id in events_to_insert } - defer.returnValue((to_delete, to_insert)) + defer.returnValue((to_delete, to_insert, current_state)) @defer.inlineCallbacks def get_event(self, event_id, check_redacted=True, @@ -682,7 +704,7 @@ class EventsStore(SQLBaseStore): def _update_current_state_txn(self, txn, state_delta_by_room): for room_id, current_state_tuple in state_delta_by_room.iteritems(): - to_delete, to_insert = current_state_tuple + to_delete, to_insert, _ = current_state_tuple txn.executemany( "DELETE FROM current_state_events WHERE event_id = ?", [(ev_id,) for ev_id in to_delete.itervalues()], diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py index cbec255966..353a135c4e 100644 --- a/synapse/storage/push_rule.py +++ b/synapse/storage/push_rule.py @@ -188,7 +188,7 @@ class PushRuleStore(SQLBaseStore): user_ids, on_invalidate=cache_context.invalidate, ) - rules_by_user = {k: v for k, v in rules_by_user.items() if v is not None} + rules_by_user = {k: v for k, v in rules_by_user.iteritems() if v is not None} defer.returnValue(rules_by_user) @@ -398,7 +398,8 @@ class PushRuleStore(SQLBaseStore): with self._push_rules_stream_id_gen.get_next() as ids: stream_id, event_stream_ordering = ids yield self.runInteraction( - "delete_push_rule", delete_push_rule_txn, stream_id, event_stream_ordering + "delete_push_rule", delete_push_rule_txn, stream_id, + event_stream_ordering, ) @defer.inlineCallbacks diff --git a/synapse/storage/state.py b/synapse/storage/state.py index e89001d994..03981f5d2b 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -16,6 +16,7 @@ from ._base import SQLBaseStore from synapse.util.caches.descriptors import cached, cachedList from synapse.util.caches import intern_string +from synapse.util.stringutils import to_ascii from synapse.storage.engines import PostgresEngine from twisted.internet import defer @@ -89,7 +90,7 @@ class StateStore(SQLBaseStore): ) return { - (r[0], r[1]): r[2] for r in txn + (intern_string(r[0]), intern_string(r[1])): to_ascii(r[2]) for r in txn } return self.runInteraction( @@ -226,6 +227,18 @@ class StateStore(SQLBaseStore): ], ) + # Prefill the state group cache with this group. + # It's fine to use the sequence like this as the state group map + # is immutable. (If the map wasn't immutable then this prefill could + # race with another update) + txn.call_after( + self._state_group_cache.update, + self._state_group_cache.sequence, + key=context.state_group, + value=context.current_state_ids, + full=True, + ) + self._simple_insert_many_txn( txn, table="event_to_state_groups", @@ -655,7 +668,7 @@ class StateStore(SQLBaseStore): state_dict = results[group] state_dict.update( - ((intern_string(k[0]), intern_string(k[1])), v) + ((intern_string(k[0]), intern_string(k[1])), to_ascii(v)) for k, v in group_state_dict.iteritems() ) diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index 98a5a26ac5..2a2360ab5d 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -26,7 +26,7 @@ logger = logging.getLogger(__name__) class DeferredTimedOutError(SynapseError): def __init__(self): - super(SynapseError, self).__init__(504, "Timed out") + super(DeferredTimedOutError, self).__init__(504, "Timed out") def unwrapFirstError(failure): diff --git a/synapse/visibility.py b/synapse/visibility.py index c4dd9ae2c7..5590b866ed 100644 --- a/synapse/visibility.py +++ b/synapse/visibility.py @@ -189,25 +189,6 @@ def filter_events_for_clients(store, user_tuples, events, event_id_to_state): @defer.inlineCallbacks -def filter_events_for_clients_context(store, user_tuples, events, event_id_to_context): - user_ids = set(u[0] for u in user_tuples) - event_id_to_state = {} - for event_id, context in event_id_to_context.items(): - state = yield store.get_events([ - e_id - for key, e_id in context.current_state_ids.iteritems() - if key == (EventTypes.RoomHistoryVisibility, "") - or (key[0] == EventTypes.Member and key[1] in user_ids) - ]) - event_id_to_state[event_id] = state - - res = yield filter_events_for_clients( - store, user_tuples, events, event_id_to_state - ) - defer.returnValue(res) - - -@defer.inlineCallbacks def filter_events_for_client(store, user_id, events, is_peeking=False): """ Check which events a user is allowed to see diff --git a/tests/storage/event_injector.py b/tests/storage/event_injector.py index 38556da9a7..024ac15069 100644 --- a/tests/storage/event_injector.py +++ b/tests/storage/event_injector.py @@ -27,10 +27,10 @@ class EventInjector: self.event_builder_factory = hs.get_event_builder_factory() @defer.inlineCallbacks - def create_room(self, room): + def create_room(self, room, user): builder = self.event_builder_factory.new({ "type": EventTypes.Create, - "sender": "", + "sender": user.to_string(), "room_id": room.to_string(), "content": {}, }) diff --git a/tests/storage/test_events.py b/tests/storage/test_events.py index 3762b38e37..14443b53bc 100644 --- a/tests/storage/test_events.py +++ b/tests/storage/test_events.py @@ -50,7 +50,7 @@ class EventsStoreTestCase(unittest.TestCase): # Create something to report room = RoomID.from_string("!abc123:test") user = UserID.from_string("@raccoonlover:test") - yield self.event_injector.create_room(room) + yield self.event_injector.create_room(room, user) self.base_event = yield self._get_last_stream_token() |