diff --git a/CHANGES.rst b/CHANGES.rst
index babeaa0ded..476d6fb6b4 100644
--- a/CHANGES.rst
+++ b/CHANGES.rst
@@ -1,3 +1,28 @@
+Changes in synapse v0.21.0-rc3 (2017-05-17)
+===========================================
+
+Features:
+
+* Add per user rate-limiting overrides (PR #2208)
+* Add config option to limit maximum number of events requested by ``/sync``
+ and ``/messages`` (PR #2221) Thanks to @psaavedra!
+
+
+Changes:
+
+* Various small performance fixes (PR #2201, #2202, #2224, #2226, #2227, #2228,
+ #2229)
+* Update username availability checker API (PR #2209, #2213)
+* When purging, don't de-delta state groups we're about to delete (PR #2214)
+* Documentation to check synapse version (PR #2215) Thanks to @hamber-dick!
+* Add an index to event_search to speed up purge history API (PR #2218)
+
+
+Bug fixes:
+
+* Fix API to allow clients to upload one-time-keys with new sigs (PR #2206)
+
+
Changes in synapse v0.21.0-rc2 (2017-05-08)
===========================================
diff --git a/UPGRADE.rst b/UPGRADE.rst
index 9f044719a0..6164df8833 100644
--- a/UPGRADE.rst
+++ b/UPGRADE.rst
@@ -28,6 +28,15 @@ running:
git pull
# Update the versions of synapse's python dependencies.
python synapse/python_dependencies.py | xargs -n1 pip install --upgrade
+
+To check whether your update was sucessfull, run:
+
+.. code:: bash
+
+ # replace your.server.domain with ther domain of your synaspe homeserver
+ curl https://<your.server.domain>/_matrix/federation/v1/version
+
+So for the Matrix.org HS server the URL would be: https://matrix.org/_matrix/federation/v1/version.
Upgrading to v0.15.0
diff --git a/synapse/__init__.py b/synapse/__init__.py
index d4ad23fa3d..c39dde913f 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -16,4 +16,4 @@
""" This is a reference implementation of a Matrix home server.
"""
-__version__ = "0.21.0-rc2"
+__version__ = "0.21.0-rc3"
diff --git a/synapse/config/server.py b/synapse/config/server.py
index 25e6666238..3910b9dc31 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -35,6 +35,8 @@ class ServerConfig(Config):
# "disable" federation
self.send_federation = config.get("send_federation", True)
+ self.filter_timeline_limit = config.get("filter_timeline_limit", -1)
+
if self.public_baseurl is not None:
if self.public_baseurl[-1] != '/':
self.public_baseurl += '/'
@@ -161,6 +163,10 @@ class ServerConfig(Config):
# The GC threshold parameters to pass to `gc.set_threshold`, if defined
# gc_thresholds: [700, 10, 10]
+ # Set the limit on the returned events in the timeline in the get
+ # and sync operations. The default value is -1, means no upper limit.
+ # filter_timeline_limit: 5000
+
# List of ports that Synapse should listen on, their purpose and their
# configuration.
listeners:
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 695f1a7375..a15198e05d 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -285,7 +285,7 @@ class TransactionQueue(object):
Args:
states (list(UserPresenceState))
"""
- hosts_and_states = yield get_interested_remotes(self.store, states)
+ hosts_and_states = yield get_interested_remotes(self.store, states, self.state)
for destinations, states in hosts_and_states:
for destination in destinations:
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index f3707afcd0..c7c0b0a1e2 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -780,12 +780,12 @@ class PresenceHandler(object):
# don't need to send to local clients here, as that is done as part
# of the event stream/sync.
# TODO: Only send to servers not already in the room.
- user_ids = yield self.store.get_users_in_room(room_id)
if self.is_mine(user):
state = yield self.current_state_for_user(user.to_string())
self._push_to_remotes([state])
else:
+ user_ids = yield self.store.get_users_in_room(room_id)
user_ids = filter(self.is_mine_id, user_ids)
states = yield self.current_state_for_users(user_ids)
@@ -1322,7 +1322,7 @@ def get_interested_parties(store, states):
@defer.inlineCallbacks
-def get_interested_remotes(store, states):
+def get_interested_remotes(store, states, state_handler):
"""Given a list of presence states figure out which remote servers
should be sent which.
@@ -1345,7 +1345,7 @@ def get_interested_remotes(store, states):
room_ids_to_states, users_to_states = yield get_interested_parties(store, states)
for room_id, states in room_ids_to_states.iteritems():
- hosts = yield store.get_hosts_in_room(room_id)
+ hosts = yield state_handler.get_current_hosts_in_room(room_id)
hosts_and_states.append((hosts, states))
for user_id, states in users_to_states.iteritems():
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index ab87632d99..1ca88517a2 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -739,10 +739,11 @@ class RoomMemberHandler(BaseHandler):
if len(current_state_ids) == 1 and create_event_id:
defer.returnValue(self.hs.is_mine_id(create_event_id))
- for (etype, state_key), event_id in current_state_ids.items():
+ for etype, state_key in current_state_ids:
if etype != EventTypes.Member or not self.hs.is_mine_id(state_key):
continue
+ event_id = current_state_ids[(etype, state_key)]
event = yield self.store.get_event(event_id, allow_none=True)
if not event:
continue
diff --git a/synapse/rest/client/v2_alpha/_base.py b/synapse/rest/client/v2_alpha/_base.py
index 20e765f48f..1f5bc24cc3 100644
--- a/synapse/rest/client/v2_alpha/_base.py
+++ b/synapse/rest/client/v2_alpha/_base.py
@@ -47,3 +47,13 @@ def client_v2_patterns(path_regex, releases=(0,),
new_prefix = CLIENT_V2_ALPHA_PREFIX.replace("/v2_alpha", "/r%d" % release)
patterns.append(re.compile("^" + new_prefix + path_regex))
return patterns
+
+
+def set_timeline_upper_limit(filter_json, filter_timeline_limit):
+ if filter_timeline_limit < 0:
+ return # no upper limits
+ timeline = filter_json.get('room', {}).get('timeline', {})
+ if 'limit' in timeline:
+ filter_json['room']['timeline']["limit"] = min(
+ filter_json['room']['timeline']['limit'],
+ filter_timeline_limit)
diff --git a/synapse/rest/client/v2_alpha/filter.py b/synapse/rest/client/v2_alpha/filter.py
index b4084fec62..d2b2fd66e6 100644
--- a/synapse/rest/client/v2_alpha/filter.py
+++ b/synapse/rest/client/v2_alpha/filter.py
@@ -20,6 +20,7 @@ from synapse.http.servlet import RestServlet, parse_json_object_from_request
from synapse.types import UserID
from ._base import client_v2_patterns
+from ._base import set_timeline_upper_limit
import logging
@@ -85,6 +86,11 @@ class CreateFilterRestServlet(RestServlet):
raise AuthError(403, "Can only create filters for local users")
content = parse_json_object_from_request(request)
+ set_timeline_upper_limit(
+ content,
+ self.hs.config.filter_timeline_limit
+ )
+
filter_id = yield self.filtering.add_user_filter(
user_localpart=target_user.localpart,
user_filter=content,
diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py
index f30eab76fd..771e127ab9 100644
--- a/synapse/rest/client/v2_alpha/sync.py
+++ b/synapse/rest/client/v2_alpha/sync.py
@@ -28,6 +28,7 @@ from synapse.api.filtering import FilterCollection, DEFAULT_FILTER_COLLECTION
from synapse.api.errors import SynapseError
from synapse.api.constants import PresenceState
from ._base import client_v2_patterns
+from ._base import set_timeline_upper_limit
import itertools
import logging
@@ -78,6 +79,7 @@ class SyncRestServlet(RestServlet):
def __init__(self, hs):
super(SyncRestServlet, self).__init__()
+ self.hs = hs
self.auth = hs.get_auth()
self.sync_handler = hs.get_sync_handler()
self.clock = hs.get_clock()
@@ -121,6 +123,8 @@ class SyncRestServlet(RestServlet):
if filter_id.startswith('{'):
try:
filter_object = json.loads(filter_id)
+ set_timeline_upper_limit(filter_object,
+ self.hs.config.filter_timeline_limit)
except:
raise SynapseError(400, "Invalid filter JSON")
self.filtering.check_valid_filter(filter_object)
diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py
index c43b185e08..caca96c222 100644
--- a/synapse/rest/media/v1/media_repository.py
+++ b/synapse/rest/media/v1/media_repository.py
@@ -34,6 +34,7 @@ from synapse.api.errors import SynapseError, HttpResponseException, \
from synapse.util.async import Linearizer
from synapse.util.stringutils import is_ascii
from synapse.util.logcontext import preserve_context_over_fn
+from synapse.util.retryutils import NotRetryingDestination
import os
import errno
@@ -181,7 +182,8 @@ class MediaRepository(object):
logger.exception("Failed to fetch remote media %s/%s",
server_name, media_id)
raise
-
+ except NotRetryingDestination:
+ logger.warn("Not retrying destination %r", server_name)
except Exception:
logger.exception("Failed to fetch remote media %s/%s",
server_name, media_id)
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index c659004e8d..58b73af7d2 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -60,12 +60,12 @@ class LoggingTransaction(object):
object.__setattr__(self, "database_engine", database_engine)
object.__setattr__(self, "after_callbacks", after_callbacks)
- def call_after(self, callback, *args):
+ def call_after(self, callback, *args, **kwargs):
"""Call the given callback on the main twisted thread after the
transaction has finished. Used to invalidate the caches on the
correct thread.
"""
- self.after_callbacks.append((callback, args))
+ self.after_callbacks.append((callback, args, kwargs))
def __getattr__(self, name):
return getattr(self.txn, name)
@@ -319,8 +319,8 @@ class SQLBaseStore(object):
inner_func, *args, **kwargs
)
finally:
- for after_callback, after_args in after_callbacks:
- after_callback(*after_args)
+ for after_callback, after_args, after_kwargs in after_callbacks:
+ after_callback(*after_args, **after_kwargs)
defer.returnValue(result)
@defer.inlineCallbacks
diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py
index d4cf0fc59b..7157fb1dfb 100644
--- a/synapse/storage/background_updates.py
+++ b/synapse/storage/background_updates.py
@@ -210,7 +210,9 @@ class BackgroundUpdateStore(SQLBaseStore):
self._background_update_handlers[update_name] = update_handler
def register_background_index_update(self, update_name, index_name,
- table, columns, where_clause=None):
+ table, columns, where_clause=None,
+ unique=False,
+ psql_only=False):
"""Helper for store classes to do a background index addition
To use:
@@ -226,6 +228,9 @@ class BackgroundUpdateStore(SQLBaseStore):
index_name (str): name of index to add
table (str): table to add index to
columns (list[str]): columns/expressions to include in index
+ unique (bool): true to make a UNIQUE index
+ psql_only: true to only create this index on psql databases (useful
+ for virtual sqlite tables)
"""
def create_index_psql(conn):
@@ -245,9 +250,11 @@ class BackgroundUpdateStore(SQLBaseStore):
c.execute(sql)
sql = (
- "CREATE INDEX CONCURRENTLY %(name)s ON %(table)s"
+ "CREATE %(unique)s INDEX CONCURRENTLY %(name)s"
+ " ON %(table)s"
" (%(columns)s) %(where_clause)s"
) % {
+ "unique": "UNIQUE" if unique else "",
"name": index_name,
"table": table,
"columns": ", ".join(columns),
@@ -270,9 +277,10 @@ class BackgroundUpdateStore(SQLBaseStore):
# down at the wrong moment - hance we use IF NOT EXISTS. (SQLite
# has supported CREATE TABLE|INDEX IF NOT EXISTS since 3.3.0.)
sql = (
- "CREATE INDEX IF NOT EXISTS %(name)s ON %(table)s"
+ "CREATE %(unique)s INDEX IF NOT EXISTS %(name)s ON %(table)s"
" (%(columns)s)"
) % {
+ "unique": "UNIQUE" if unique else "",
"name": index_name,
"table": table,
"columns": ", ".join(columns),
@@ -284,13 +292,16 @@ class BackgroundUpdateStore(SQLBaseStore):
if isinstance(self.database_engine, engines.PostgresEngine):
runner = create_index_psql
+ elif psql_only:
+ runner = None
else:
runner = create_index_sqlite
@defer.inlineCallbacks
def updater(progress, batch_size):
- logger.info("Adding index %s to %s", index_name, table)
- yield self.runWithConnection(runner)
+ if runner is not None:
+ logger.info("Adding index %s to %s", index_name, table)
+ yield self.runWithConnection(runner)
yield self._end_background_update(update_name)
defer.returnValue(1)
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 2ab44ceaa7..c4aeb48800 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -207,6 +207,18 @@ class EventsStore(SQLBaseStore):
where_clause="contains_url = true AND outlier = false",
)
+ # an event_id index on event_search is useful for the purge_history
+ # api. Plus it means we get to enforce some integrity with a UNIQUE
+ # clause
+ self.register_background_index_update(
+ "event_search_event_id_idx",
+ index_name="event_search_event_id_idx",
+ table="event_search",
+ columns=["event_id"],
+ unique=True,
+ psql_only=True,
+ )
+
self._event_persist_queue = _EventPeristenceQueue()
def persist_events(self, events_and_contexts, backfilled=False):
@@ -387,6 +399,11 @@ class EventsStore(SQLBaseStore):
event_counter.inc(event.type, origin_type, origin_entity)
+ for room_id, (_, _, new_state) in current_state_for_room.iteritems():
+ self.get_current_state_ids.prefill(
+ (room_id, ), new_state
+ )
+
@defer.inlineCallbacks
def _calculate_new_extremeties(self, room_id, event_contexts, latest_event_ids):
"""Calculates the new forward extremeties for a room given events to
@@ -435,10 +452,10 @@ class EventsStore(SQLBaseStore):
Assumes that we are only persisting events for one room at a time.
Returns:
- 2-tuple (to_delete, to_insert) where both are state dicts, i.e.
- (type, state_key) -> event_id. `to_delete` are the entries to
+ 3-tuple (to_delete, to_insert, new_state) where both are state dicts,
+ i.e. (type, state_key) -> event_id. `to_delete` are the entries to
first be deleted from current_state_events, `to_insert` are entries
- to insert.
+ to insert. `new_state` is the full set of state.
May return None if there are no changes to be applied.
"""
# Now we need to work out the different state sets for
@@ -545,7 +562,7 @@ class EventsStore(SQLBaseStore):
if ev_id in events_to_insert
}
- defer.returnValue((to_delete, to_insert))
+ defer.returnValue((to_delete, to_insert, current_state))
@defer.inlineCallbacks
def get_event(self, event_id, check_redacted=True,
@@ -698,7 +715,7 @@ class EventsStore(SQLBaseStore):
def _update_current_state_txn(self, txn, state_delta_by_room):
for room_id, current_state_tuple in state_delta_by_room.iteritems():
- to_delete, to_insert = current_state_tuple
+ to_delete, to_insert, _ = current_state_tuple
txn.executemany(
"DELETE FROM current_state_events WHERE event_id = ?",
[(ev_id,) for ev_id in to_delete.itervalues()],
@@ -2022,6 +2039,8 @@ class EventsStore(SQLBaseStore):
400, "topological_ordering is greater than forward extremeties"
)
+ logger.debug("[purge] looking for events to delete")
+
txn.execute(
"SELECT event_id, state_key FROM events"
" LEFT JOIN state_events USING (room_id, event_id)"
@@ -2030,9 +2049,19 @@ class EventsStore(SQLBaseStore):
)
event_rows = txn.fetchall()
+ to_delete = [
+ (event_id,) for event_id, state_key in event_rows
+ if state_key is None and not self.hs.is_mine_id(event_id)
+ ]
+ logger.info(
+ "[purge] found %i events before cutoff, of which %i are remote"
+ " non-state events to delete", len(event_rows), len(to_delete))
+
for event_id, state_key in event_rows:
txn.call_after(self._get_state_group_for_event.invalidate, (event_id,))
+ logger.debug("[purge] Finding new backward extremities")
+
# We calculate the new entries for the backward extremeties by finding
# all events that point to events that are to be purged
txn.execute(
@@ -2045,6 +2074,8 @@ class EventsStore(SQLBaseStore):
)
new_backwards_extrems = txn.fetchall()
+ logger.debug("[purge] replacing backward extremities: %r", new_backwards_extrems)
+
txn.execute(
"DELETE FROM event_backward_extremities WHERE room_id = ?",
(room_id,)
@@ -2059,6 +2090,8 @@ class EventsStore(SQLBaseStore):
]
)
+ logger.debug("[purge] finding redundant state groups")
+
# Get all state groups that are only referenced by events that are
# to be deleted.
txn.execute(
@@ -2074,15 +2107,20 @@ class EventsStore(SQLBaseStore):
)
state_rows = txn.fetchall()
- state_groups_to_delete = [sg for sg, in state_rows]
+ logger.debug("[purge] found %i redundant state groups", len(state_rows))
+
+ # make a set of the redundant state groups, so that we can look them up
+ # efficiently
+ state_groups_to_delete = set([sg for sg, in state_rows])
# Now we get all the state groups that rely on these state groups
- new_state_edges = []
- chunks = [
- state_groups_to_delete[i:i + 100]
- for i in xrange(0, len(state_groups_to_delete), 100)
- ]
- for chunk in chunks:
+ logger.debug("[purge] finding state groups which depend on redundant"
+ " state groups")
+ remaining_state_groups = []
+ for i in xrange(0, len(state_rows), 100):
+ chunk = [sg for sg, in state_rows[i:i + 100]]
+ # look for state groups whose prev_state_group is one we are about
+ # to delete
rows = self._simple_select_many_txn(
txn,
table="state_group_edges",
@@ -2091,21 +2129,28 @@ class EventsStore(SQLBaseStore):
retcols=["state_group"],
keyvalues={},
)
- new_state_edges.extend(row["state_group"] for row in rows)
+ remaining_state_groups.extend(
+ row["state_group"] for row in rows
+
+ # exclude state groups we are about to delete: no point in
+ # updating them
+ if row["state_group"] not in state_groups_to_delete
+ )
- # Now we turn the state groups that reference to-be-deleted state groups
- # to non delta versions.
- for new_state_edge in new_state_edges:
+ # Now we turn the state groups that reference to-be-deleted state
+ # groups to non delta versions.
+ for sg in remaining_state_groups:
+ logger.debug("[purge] de-delta-ing remaining state group %s", sg)
curr_state = self._get_state_groups_from_groups_txn(
- txn, [new_state_edge], types=None
+ txn, [sg], types=None
)
- curr_state = curr_state[new_state_edge]
+ curr_state = curr_state[sg]
self._simple_delete_txn(
txn,
table="state_groups_state",
keyvalues={
- "state_group": new_state_edge,
+ "state_group": sg,
}
)
@@ -2113,7 +2158,7 @@ class EventsStore(SQLBaseStore):
txn,
table="state_group_edges",
keyvalues={
- "state_group": new_state_edge,
+ "state_group": sg,
}
)
@@ -2122,7 +2167,7 @@ class EventsStore(SQLBaseStore):
table="state_groups_state",
values=[
{
- "state_group": new_state_edge,
+ "state_group": sg,
"room_id": room_id,
"type": key[0],
"state_key": key[1],
@@ -2132,6 +2177,7 @@ class EventsStore(SQLBaseStore):
],
)
+ logger.debug("[purge] removing redundant state groups")
txn.executemany(
"DELETE FROM state_groups_state WHERE state_group = ?",
state_rows
@@ -2140,22 +2186,21 @@ class EventsStore(SQLBaseStore):
"DELETE FROM state_groups WHERE id = ?",
state_rows
)
+
# Delete all non-state
+ logger.debug("[purge] removing events from event_to_state_groups")
txn.executemany(
"DELETE FROM event_to_state_groups WHERE event_id = ?",
[(event_id,) for event_id, _ in event_rows]
)
+ logger.debug("[purge] updating room_depth")
txn.execute(
"UPDATE room_depth SET min_depth = ? WHERE room_id = ?",
(topological_ordering, room_id,)
)
# Delete all remote non-state events
- to_delete = [
- (event_id,) for event_id, state_key in event_rows
- if state_key is None and not self.hs.is_mine_id(event_id)
- ]
for table in (
"events",
"event_json",
@@ -2171,16 +2216,15 @@ class EventsStore(SQLBaseStore):
"event_signatures",
"rejections",
):
+ logger.debug("[purge] removing remote non-state events from %s", table)
+
txn.executemany(
"DELETE FROM %s WHERE event_id = ?" % (table,),
to_delete
)
- txn.executemany(
- "DELETE FROM events WHERE event_id = ?",
- to_delete
- )
# Mark all state and own events as outliers
+ logger.debug("[purge] marking remaining events as outliers")
txn.executemany(
"UPDATE events SET outlier = ?"
" WHERE event_id = ?",
@@ -2190,6 +2234,8 @@ class EventsStore(SQLBaseStore):
]
)
+ logger.info("[purge] done")
+
@defer.inlineCallbacks
def is_event_after(self, event_id1, event_id2):
"""Returns True if event_id1 is after event_id2 in the stream
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 2fa20bd87c..0829ae5bee 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -387,7 +387,9 @@ class RoomMemberStore(SQLBaseStore):
state_group = object()
return self._get_joined_users_from_context(
- event.room_id, state_group, context.current_state_ids, event=event,
+ event.room_id, state_group, context.current_state_ids,
+ event=event,
+ context=context,
)
def get_joined_users_from_state(self, room_id, state_group, state_ids):
@@ -405,19 +407,39 @@ class RoomMemberStore(SQLBaseStore):
@cachedInlineCallbacks(num_args=2, cache_context=True, iterable=True,
max_entries=100000)
def _get_joined_users_from_context(self, room_id, state_group, current_state_ids,
- cache_context, event=None):
+ cache_context, event=None, context=None):
# We don't use `state_group`, it's there so that we can cache based
# on it. However, it's important that it's never None, since two current_states
# with a state_group of None are likely to be different.
# See bulk_get_push_rules_for_room for how we work around this.
assert state_group is not None
+ users_in_room = {}
member_event_ids = [
e_id
for key, e_id in current_state_ids.iteritems()
if key[0] == EventTypes.Member
]
+ if context is not None:
+ # If we have a context with a delta from a previous state group,
+ # check if we also have the result from the previous group in cache.
+ # If we do then we can reuse that result and simply update it with
+ # any membership changes in `delta_ids`
+ if context.prev_group and context.delta_ids:
+ prev_res = self._get_joined_users_from_context.cache.get(
+ (room_id, context.prev_group), None
+ )
+ if prev_res and isinstance(prev_res, dict):
+ users_in_room = dict(prev_res)
+ member_event_ids = [
+ e_id
+ for key, e_id in context.delta_ids.iteritems()
+ if key[0] == EventTypes.Member
+ ]
+ for etype, state_key in context.delta_ids:
+ users_in_room.pop(state_key, None)
+
# We check if we have any of the member event ids in the event cache
# before we ask the DB
@@ -431,7 +453,6 @@ class RoomMemberStore(SQLBaseStore):
)
missing_member_event_ids = []
- users_in_room = {}
for event_id in member_event_ids:
ev_entry = event_map.get(event_id)
if ev_entry:
@@ -534,7 +555,7 @@ class RoomMemberStore(SQLBaseStore):
assert state_group is not None
joined_hosts = set()
- for (etype, state_key), event_id in current_state_ids.items():
+ for etype, state_key in current_state_ids:
if etype == EventTypes.Member:
try:
host = get_domain_from_id(state_key)
@@ -545,6 +566,7 @@ class RoomMemberStore(SQLBaseStore):
if host in joined_hosts:
continue
+ event_id = current_state_ids[(etype, state_key)]
event = yield self.get_event(event_id, allow_none=True)
if event and event.content["membership"] == Membership.JOIN:
joined_hosts.add(intern_string(host))
diff --git a/synapse/storage/schema/delta/37/remove_auth_idx.py b/synapse/storage/schema/delta/37/remove_auth_idx.py
index 784f3b348f..20ad8bd5a6 100644
--- a/synapse/storage/schema/delta/37/remove_auth_idx.py
+++ b/synapse/storage/schema/delta/37/remove_auth_idx.py
@@ -36,6 +36,10 @@ DROP INDEX IF EXISTS transactions_have_ref;
-- and is used incredibly rarely.
DROP INDEX IF EXISTS events_order_topo_stream_room;
+-- an equivalent index to this actually gets re-created in delta 41, because it
+-- turned out that deleting it wasn't a great plan :/. In any case, let's
+-- delete it here, and delta 41 will create a new one with an added UNIQUE
+-- constraint
DROP INDEX IF EXISTS event_search_ev_idx;
"""
diff --git a/synapse/storage/schema/delta/41/event_search_event_id_idx.sql b/synapse/storage/schema/delta/41/event_search_event_id_idx.sql
new file mode 100644
index 0000000000..5d9cfecf36
--- /dev/null
+++ b/synapse/storage/schema/delta/41/event_search_event_id_idx.sql
@@ -0,0 +1,17 @@
+/* Copyright 2017 Vector Creations Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+INSERT into background_updates (update_name, progress_json)
+ VALUES ('event_search_event_id_idx', '{}');
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index a16afa8df5..85acf2ad1e 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -227,6 +227,18 @@ class StateStore(SQLBaseStore):
],
)
+ # Prefill the state group cache with this group.
+ # It's fine to use the sequence like this as the state group map
+ # is immutable. (If the map wasn't immutable then this prefill could
+ # race with another update)
+ txn.call_after(
+ self._state_group_cache.update,
+ self._state_group_cache.sequence,
+ key=context.state_group,
+ value=dict(context.current_state_ids),
+ full=True,
+ )
+
self._simple_insert_many_txn(
txn,
table="event_to_state_groups",
diff --git a/synapse/types.py b/synapse/types.py
index c87ed813b9..445bdcb4d7 100644
--- a/synapse/types.py
+++ b/synapse/types.py
@@ -56,10 +56,10 @@ def create_requester(user_id, access_token_id=None, is_guest=False,
def get_domain_from_id(string):
- try:
- return string.split(":", 1)[1]
- except IndexError:
+ idx = string.find(":")
+ if idx == -1:
raise SynapseError(400, "Invalid ID: %r" % (string,))
+ return string[idx + 1:]
class DomainSpecificString(
|