diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index e78f8d0114..c22762eb5c 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -448,6 +448,7 @@ class EventPushActionsWorkerStore(SQLBaseStore):
"add_push_actions_to_staging", _add_push_actions_to_staging_txn
)
+ @defer.inlineCallbacks
def remove_push_actions_from_staging(self, event_id):
"""Called if we failed to persist the event to ensure that stale push
actions don't build up in the DB
@@ -456,13 +457,22 @@ class EventPushActionsWorkerStore(SQLBaseStore):
event_id (str)
"""
- return self._simple_delete(
- table="event_push_actions_staging",
- keyvalues={
- "event_id": event_id,
- },
- desc="remove_push_actions_from_staging",
- )
+ try:
+ res = yield self._simple_delete(
+ table="event_push_actions_staging",
+ keyvalues={
+ "event_id": event_id,
+ },
+ desc="remove_push_actions_from_staging",
+ )
+ defer.returnValue(res)
+ except Exception:
+ # this method is called from an exception handler, so propagating
+ # another exception here really isn't helpful - there's nothing
+ # the caller can do about it. Just log the exception and move on.
+ logger.exception(
+ "Error removing push actions after event persistence failure",
+ )
@defer.inlineCallbacks
def _find_stream_orderings_for_times(self):
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 5fe4a0e56c..05cde96afc 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -22,7 +22,6 @@ import logging
import simplejson as json
from twisted.internet import defer
-
from synapse.storage.events_worker import EventsWorkerStore
from synapse.util.async import ObservableDeferred
from synapse.util.frozenutils import frozendict_json_encoder
@@ -425,7 +424,9 @@ class EventsStore(EventsWorkerStore):
)
current_state = yield self._get_new_state_after_events(
room_id,
- ev_ctx_rm, new_latest_event_ids,
+ ev_ctx_rm,
+ latest_event_ids,
+ new_latest_event_ids,
)
if current_state is not None:
current_state_for_room[room_id] = current_state
@@ -513,7 +514,8 @@ class EventsStore(EventsWorkerStore):
defer.returnValue(new_latest_event_ids)
@defer.inlineCallbacks
- def _get_new_state_after_events(self, room_id, events_context, new_latest_event_ids):
+ def _get_new_state_after_events(self, room_id, events_context, old_latest_event_ids,
+ new_latest_event_ids):
"""Calculate the current state dict after adding some new events to
a room
@@ -524,6 +526,9 @@ class EventsStore(EventsWorkerStore):
events_context (list[(EventBase, EventContext)]):
events and contexts which are being added to the room
+ old_latest_event_ids (iterable[str]):
+ the old forward extremities for the room.
+
new_latest_event_ids (iterable[str]):
the new forward extremities for the room.
@@ -534,64 +539,89 @@ class EventsStore(EventsWorkerStore):
"""
if not new_latest_event_ids:
- defer.returnValue({})
+ return
# map from state_group to ((type, key) -> event_id) state map
- state_groups = {}
- missing_event_ids = []
- was_updated = False
+ state_groups_map = {}
+ for ev, ctx in events_context:
+ if ctx.state_group is None:
+ # I don't think this can happen, but let's double-check
+ raise Exception(
+ "Context for new extremity event %s has no state "
+ "group" % (ev.event_id, ),
+ )
+
+ if ctx.state_group in state_groups_map:
+ continue
+
+ state_groups_map[ctx.state_group] = ctx.current_state_ids
+
+ # We need to map the event_ids to their state groups. First, let's
+ # check if the event is one we're persisting, in which case we can
+ # pull the state group from its context.
+ # Otherwise we need to pull the state group from the database.
+
+ # Set of events we need to fetch groups for. (We know none of the old
+ # extremities are going to be in events_context).
+ missing_event_ids = set(old_latest_event_ids)
+
+ event_id_to_state_group = {}
for event_id in new_latest_event_ids:
- # First search in the list of new events we're adding,
- # and then use the current state from that
+ # First search in the list of new events we're adding.
for ev, ctx in events_context:
if event_id == ev.event_id:
- if ctx.current_state_ids is None:
- raise Exception("Unknown current state")
-
- if ctx.state_group is None:
- # I don't think this can happen, but let's double-check
- raise Exception(
- "Context for new extremity event %s has no state "
- "group" % (event_id, ),
- )
-
- # If we've already seen the state group don't bother adding
- # it to the state sets again
- if ctx.state_group not in state_groups:
- state_groups[ctx.state_group] = ctx.current_state_ids
- if ctx.delta_ids or hasattr(ev, "state_key"):
- was_updated = True
+ event_id_to_state_group[event_id] = ctx.state_group
break
else:
# If we couldn't find it, then we'll need to pull
# the state from the database
- was_updated = True
- missing_event_ids.append(event_id)
-
- if not was_updated:
- return
+ missing_event_ids.add(event_id)
if missing_event_ids:
- # Now pull out the state for any missing events from DB
+ # Now pull out the state groups for any missing events from DB
event_to_groups = yield self._get_state_group_for_events(
missing_event_ids,
)
+ event_id_to_state_group.update(event_to_groups)
+
+ # State groups of old_latest_event_ids
+ old_state_groups = set(
+ event_id_to_state_group[evid] for evid in old_latest_event_ids
+ )
+
+ # State groups of new_latest_event_ids
+ new_state_groups = set(
+ event_id_to_state_group[evid] for evid in new_latest_event_ids
+ )
- groups = set(event_to_groups.itervalues()) - set(state_groups.iterkeys())
+ # If they old and new groups are the same then we don't need to do
+ # anything.
+ if old_state_groups == new_state_groups:
+ return
- if groups:
- group_to_state = yield self._get_state_for_groups(groups)
- state_groups.update(group_to_state)
+ # Now that we have calculated new_state_groups we need to get
+ # their state IDs so we can resolve to a single state set.
+ missing_state = new_state_groups - set(state_groups_map)
+ if missing_state:
+ group_to_state = yield self._get_state_for_groups(missing_state)
+ state_groups_map.update(group_to_state)
- if len(state_groups) == 1:
+ if len(new_state_groups) == 1:
# If there is only one state group, then we know what the current
# state is.
- defer.returnValue(state_groups.values()[0])
+ defer.returnValue(state_groups_map[new_state_groups.pop()])
+
+ # Ok, we need to defer to the state handler to resolve our state sets.
def get_events(ev_ids):
return self.get_events(
ev_ids, get_prev_content=False, check_redacted=False,
)
+
+ state_groups = {
+ sg: state_groups_map[sg] for sg in new_state_groups
+ }
+
events_map = {ev.event_id: ev for ev, _ in events_context}
logger.debug("calling resolve_state_groups from preserve_events")
res = yield self._state_resolution_handler.resolve_state_groups(
diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py
index a937b9bceb..ba834854e1 100644
--- a/synapse/storage/events_worker.py
+++ b/synapse/storage/events_worker.py
@@ -20,7 +20,7 @@ from synapse.events import FrozenEvent
from synapse.events.utils import prune_event
from synapse.util.logcontext import (
- preserve_fn, PreserveLoggingContext, make_deferred_yieldable
+ PreserveLoggingContext, make_deferred_yieldable, run_in_background,
)
from synapse.util.metrics import Measure
from synapse.api.errors import SynapseError
@@ -319,7 +319,8 @@ class EventsWorkerStore(SQLBaseStore):
res = yield make_deferred_yieldable(defer.gatherResults(
[
- preserve_fn(self._get_event_from_row)(
+ run_in_background(
+ self._get_event_from_row,
row["internal_metadata"], row["json"], row["redacts"],
rejected_reason=row["rejects"],
)
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 6b557ca0cf..a50717db2d 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -22,6 +22,8 @@ from synapse.storage import background_updates
from synapse.storage._base import SQLBaseStore
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
+from six.moves import range
+
class RegistrationWorkerStore(SQLBaseStore):
@cached()
@@ -469,7 +471,7 @@ class RegistrationStore(RegistrationWorkerStore,
match = regex.search(user_id)
if match:
found.add(int(match.group(1)))
- for i in xrange(len(found) + 1):
+ for i in range(len(found) + 1):
if i not in found:
return i
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index 740c036975..ea6a189185 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -530,7 +530,7 @@ class RoomStore(RoomWorkerStore, SearchStore):
# Convert the IDs to MXC URIs
for media_id in local_mxcs:
- local_media_mxcs.append("mxc://%s/%s" % (self.hostname, media_id))
+ local_media_mxcs.append("mxc://%s/%s" % (self.hs.hostname, media_id))
for hostname, media_id in remote_mxcs:
remote_media_mxcs.append("mxc://%s/%s" % (hostname, media_id))
@@ -595,7 +595,7 @@ class RoomStore(RoomWorkerStore, SearchStore):
while next_token:
sql = """
SELECT stream_ordering, json FROM events
- JOIN event_json USING (event_id)
+ JOIN event_json USING (room_id, event_id)
WHERE room_id = ?
AND stream_ordering < ?
AND contains_url = ? AND outlier = ?
@@ -619,7 +619,7 @@ class RoomStore(RoomWorkerStore, SearchStore):
if matches:
hostname = matches.group(1)
media_id = matches.group(2)
- if hostname == self.hostname:
+ if hostname == self.hs.hostname:
local_media_mxcs.append(media_id)
else:
remote_media_mxcs.append((hostname, media_id))
diff --git a/synapse/storage/schema/delta/30/as_users.py b/synapse/storage/schema/delta/30/as_users.py
index c53e53c94f..85bd1a2006 100644
--- a/synapse/storage/schema/delta/30/as_users.py
+++ b/synapse/storage/schema/delta/30/as_users.py
@@ -14,6 +14,8 @@
import logging
from synapse.config.appservice import load_appservices
+from six.moves import range
+
logger = logging.getLogger(__name__)
@@ -58,7 +60,7 @@ def run_upgrade(cur, database_engine, config, *args, **kwargs):
for as_id, user_ids in owned.items():
n = 100
- user_chunks = (user_ids[i:i + 100] for i in xrange(0, len(user_ids), n))
+ user_chunks = (user_ids[i:i + 100] for i in range(0, len(user_ids), n))
for chunk in user_chunks:
cur.execute(
database_engine.convert_param_style(
diff --git a/synapse/storage/schema/delta/48/group_unique_indexes.py b/synapse/storage/schema/delta/48/group_unique_indexes.py
new file mode 100644
index 0000000000..2233af87d7
--- /dev/null
+++ b/synapse/storage/schema/delta/48/group_unique_indexes.py
@@ -0,0 +1,57 @@
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from synapse.storage.engines import PostgresEngine
+from synapse.storage.prepare_database import get_statements
+
+FIX_INDEXES = """
+-- rebuild indexes as uniques
+DROP INDEX groups_invites_g_idx;
+CREATE UNIQUE INDEX group_invites_g_idx ON group_invites(group_id, user_id);
+DROP INDEX groups_users_g_idx;
+CREATE UNIQUE INDEX group_users_g_idx ON group_users(group_id, user_id);
+
+-- rename other indexes to actually match their table names..
+DROP INDEX groups_users_u_idx;
+CREATE INDEX group_users_u_idx ON group_users(user_id);
+DROP INDEX groups_invites_u_idx;
+CREATE INDEX group_invites_u_idx ON group_invites(user_id);
+DROP INDEX groups_rooms_g_idx;
+CREATE UNIQUE INDEX group_rooms_g_idx ON group_rooms(group_id, room_id);
+DROP INDEX groups_rooms_r_idx;
+CREATE INDEX group_rooms_r_idx ON group_rooms(room_id);
+"""
+
+
+def run_create(cur, database_engine, *args, **kwargs):
+ rowid = "ctid" if isinstance(database_engine, PostgresEngine) else "rowid"
+
+ # remove duplicates from group_users & group_invites tables
+ cur.execute("""
+ DELETE FROM group_users WHERE %s NOT IN (
+ SELECT min(%s) FROM group_users GROUP BY group_id, user_id
+ );
+ """ % (rowid, rowid))
+ cur.execute("""
+ DELETE FROM group_invites WHERE %s NOT IN (
+ SELECT min(%s) FROM group_invites GROUP BY group_id, user_id
+ );
+ """ % (rowid, rowid))
+
+ for statement in get_statements(FIX_INDEXES.splitlines()):
+ cur.execute(statement)
+
+
+def run_upgrade(*args, **kwargs):
+ pass
diff --git a/synapse/storage/search.py b/synapse/storage/search.py
index 426cbe6e1a..6ba3e59889 100644
--- a/synapse/storage/search.py
+++ b/synapse/storage/search.py
@@ -77,7 +77,7 @@ class SearchStore(BackgroundUpdateStore):
sql = (
"SELECT stream_ordering, event_id, room_id, type, json, "
" origin_server_ts FROM events"
- " JOIN event_json USING (event_id)"
+ " JOIN event_json USING (room_id, event_id)"
" WHERE ? <= stream_ordering AND stream_ordering < ?"
" AND (%s)"
" ORDER BY stream_ordering DESC"
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 2956c3b3e0..ea24710ad8 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -38,15 +38,17 @@ from twisted.internet import defer
from synapse.storage._base import SQLBaseStore
from synapse.storage.events import EventsWorkerStore
-from synapse.util.caches.descriptors import cached
from synapse.types import RoomStreamToken
from synapse.util.caches.stream_change_cache import StreamChangeCache
-from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
-from synapse.storage.engines import PostgresEngine, Sqlite3Engine
+from synapse.util.logcontext import make_deferred_yieldable, run_in_background
+from synapse.storage.engines import PostgresEngine
import abc
import logging
+from six.moves import range
+from collections import namedtuple
+
logger = logging.getLogger(__name__)
@@ -58,6 +60,12 @@ _STREAM_TOKEN = "stream"
_TOPOLOGICAL_TOKEN = "topological"
+# Used as return values for pagination APIs
+_EventDictReturn = namedtuple("_EventDictReturn", (
+ "event_id", "topological_ordering", "stream_ordering",
+))
+
+
def lower_bound(token, engine, inclusive=False):
inclusive = "=" if inclusive else ""
if token.topological is None:
@@ -196,13 +204,14 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
results = {}
room_ids = list(room_ids)
- for rm_ids in (room_ids[i:i + 20] for i in xrange(0, len(room_ids), 20)):
+ for rm_ids in (room_ids[i:i + 20] for i in range(0, len(room_ids), 20)):
res = yield make_deferred_yieldable(defer.gatherResults([
- preserve_fn(self.get_room_events_stream_for_room)(
+ run_in_background(
+ self.get_room_events_stream_for_room,
room_id, from_key, to_key, limit, order=order,
)
for room_id in rm_ids
- ]))
+ ], consumeErrors=True))
results.update(dict(zip(rm_ids, res)))
defer.returnValue(results)
@@ -224,54 +233,55 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
@defer.inlineCallbacks
def get_room_events_stream_for_room(self, room_id, from_key, to_key, limit=0,
order='DESC'):
- # Note: If from_key is None then we return in topological order. This
- # is because in that case we're using this as a "get the last few messages
- # in a room" function, rather than "get new messages since last sync"
- if from_key is not None:
- from_id = RoomStreamToken.parse_stream_token(from_key).stream
- else:
- from_id = None
- to_id = RoomStreamToken.parse_stream_token(to_key).stream
+ """Get new room events in stream ordering since `from_key`.
+
+ Args:
+ room_id (str)
+ from_key (str): Token from which no events are returned before
+ to_key (str): Token from which no events are returned after. (This
+ is typically the current stream token)
+ limit (int): Maximum number of events to return
+ order (str): Either "DESC" or "ASC". Determines which events are
+ returned when the result is limited. If "DESC" then the most
+ recent `limit` events are returned, otherwise returns the
+ oldest `limit` events.
+
+ Returns:
+ Deferred[tuple[list[FrozenEvent], str]]: Returns the list of
+ events (in ascending order) and the token from the start of
+ the chunk of events returned.
+ """
if from_key == to_key:
defer.returnValue(([], from_key))
- if from_id:
- has_changed = yield self._events_stream_cache.has_entity_changed(
- room_id, from_id
- )
-
- if not has_changed:
- defer.returnValue(([], from_key))
+ from_id = RoomStreamToken.parse_stream_token(from_key).stream
+ to_id = RoomStreamToken.parse_stream_token(to_key).stream
- def f(txn):
- if from_id is not None:
- sql = (
- "SELECT event_id, stream_ordering FROM events WHERE"
- " room_id = ?"
- " AND not outlier"
- " AND stream_ordering > ? AND stream_ordering <= ?"
- " ORDER BY stream_ordering %s LIMIT ?"
- ) % (order,)
- txn.execute(sql, (room_id, from_id, to_id, limit))
- else:
- sql = (
- "SELECT event_id, stream_ordering FROM events WHERE"
- " room_id = ?"
- " AND not outlier"
- " AND stream_ordering <= ?"
- " ORDER BY topological_ordering %s, stream_ordering %s LIMIT ?"
- ) % (order, order,)
- txn.execute(sql, (room_id, to_id, limit))
+ has_changed = yield self._events_stream_cache.has_entity_changed(
+ room_id, from_id
+ )
- rows = self.cursor_to_dict(txn)
+ if not has_changed:
+ defer.returnValue(([], from_key))
+ def f(txn):
+ sql = (
+ "SELECT event_id, stream_ordering FROM events WHERE"
+ " room_id = ?"
+ " AND not outlier"
+ " AND stream_ordering > ? AND stream_ordering <= ?"
+ " ORDER BY stream_ordering %s LIMIT ?"
+ ) % (order,)
+ txn.execute(sql, (room_id, from_id, to_id, limit))
+
+ rows = [_EventDictReturn(row[0], None, row[1]) for row in txn]
return rows
rows = yield self.runInteraction("get_room_events_stream_for_room", f)
ret = yield self._get_events(
- [r["event_id"] for r in rows],
+ [r.event_id for r in rows],
get_prev_content=True
)
@@ -281,7 +291,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
ret.reverse()
if rows:
- key = "s%d" % min(r["stream_ordering"] for r in rows)
+ key = "s%d" % min(r.stream_ordering for r in rows)
else:
# Assume we didn't get anything because there was nothing to
# get.
@@ -291,10 +301,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
@defer.inlineCallbacks
def get_membership_changes_for_user(self, user_id, from_key, to_key):
- if from_key is not None:
- from_id = RoomStreamToken.parse_stream_token(from_key).stream
- else:
- from_id = None
+ from_id = RoomStreamToken.parse_stream_token(from_key).stream
to_id = RoomStreamToken.parse_stream_token(to_key).stream
if from_key == to_key:
@@ -308,34 +315,24 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
defer.returnValue([])
def f(txn):
- if from_id is not None:
- sql = (
- "SELECT m.event_id, stream_ordering FROM events AS e,"
- " room_memberships AS m"
- " WHERE e.event_id = m.event_id"
- " AND m.user_id = ?"
- " AND e.stream_ordering > ? AND e.stream_ordering <= ?"
- " ORDER BY e.stream_ordering ASC"
- )
- txn.execute(sql, (user_id, from_id, to_id,))
- else:
- sql = (
- "SELECT m.event_id, stream_ordering FROM events AS e,"
- " room_memberships AS m"
- " WHERE e.event_id = m.event_id"
- " AND m.user_id = ?"
- " AND stream_ordering <= ?"
- " ORDER BY stream_ordering ASC"
- )
- txn.execute(sql, (user_id, to_id,))
- rows = self.cursor_to_dict(txn)
+ sql = (
+ "SELECT m.event_id, stream_ordering FROM events AS e,"
+ " room_memberships AS m"
+ " WHERE e.event_id = m.event_id"
+ " AND m.user_id = ?"
+ " AND e.stream_ordering > ? AND e.stream_ordering <= ?"
+ " ORDER BY e.stream_ordering ASC"
+ )
+ txn.execute(sql, (user_id, from_id, to_id,))
+
+ rows = [_EventDictReturn(row[0], None, row[1]) for row in txn]
return rows
rows = yield self.runInteraction("get_membership_changes_for_user", f)
ret = yield self._get_events(
- [r["event_id"] for r in rows],
+ [r.event_id for r in rows],
get_prev_content=True
)
@@ -344,14 +341,28 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
defer.returnValue(ret)
@defer.inlineCallbacks
- def get_recent_events_for_room(self, room_id, limit, end_token, from_token=None):
+ def get_recent_events_for_room(self, room_id, limit, end_token):
+ """Get the most recent events in the room in topological ordering.
+
+ Args:
+ room_id (str)
+ limit (int)
+ end_token (str): The stream token representing now.
+
+ Returns:
+ Deferred[tuple[list[FrozenEvent], str]]: Returns a list of
+ events and a token pointing to the start of the returned
+ events.
+ The events returned are in ascending order.
+ """
+
rows, token = yield self.get_recent_event_ids_for_room(
- room_id, limit, end_token, from_token
+ room_id, limit, end_token,
)
logger.debug("stream before")
events = yield self._get_events(
- [r["event_id"] for r in rows],
+ [r.event_id for r in rows],
get_prev_content=True
)
logger.debug("stream after")
@@ -360,60 +371,36 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
defer.returnValue((events, token))
- @cached(num_args=4)
- def get_recent_event_ids_for_room(self, room_id, limit, end_token, from_token=None):
- end_token = RoomStreamToken.parse_stream_token(end_token)
-
- if from_token is None:
- sql = (
- "SELECT stream_ordering, topological_ordering, event_id"
- " FROM events"
- " WHERE room_id = ? AND stream_ordering <= ? AND outlier = ?"
- " ORDER BY topological_ordering DESC, stream_ordering DESC"
- " LIMIT ?"
- )
- else:
- from_token = RoomStreamToken.parse_stream_token(from_token)
- sql = (
- "SELECT stream_ordering, topological_ordering, event_id"
- " FROM events"
- " WHERE room_id = ? AND stream_ordering > ?"
- " AND stream_ordering <= ? AND outlier = ?"
- " ORDER BY topological_ordering DESC, stream_ordering DESC"
- " LIMIT ?"
- )
-
- def get_recent_events_for_room_txn(txn):
- if from_token is None:
- txn.execute(sql, (room_id, end_token.stream, False, limit,))
- else:
- txn.execute(sql, (
- room_id, from_token.stream, end_token.stream, False, limit
- ))
+ @defer.inlineCallbacks
+ def get_recent_event_ids_for_room(self, room_id, limit, end_token):
+ """Get the most recent events in the room in topological ordering.
- rows = self.cursor_to_dict(txn)
+ Args:
+ room_id (str)
+ limit (int)
+ end_token (str): The stream token representing now.
- rows.reverse() # As we selected with reverse ordering
+ Returns:
+ Deferred[tuple[list[_EventDictReturn], str]]: Returns a list of
+ _EventDictReturn and a token pointing to the start of the returned
+ events.
+ The events returned are in ascending order.
+ """
+ # Allow a zero limit here, and no-op.
+ if limit == 0:
+ defer.returnValue(([], end_token))
- if rows:
- # Tokens are positions between events.
- # This token points *after* the last event in the chunk.
- # We need it to point to the event before it in the chunk
- # since we are going backwards so we subtract one from the
- # stream part.
- topo = rows[0]["topological_ordering"]
- toke = rows[0]["stream_ordering"] - 1
- start_token = str(RoomStreamToken(topo, toke))
+ end_token = RoomStreamToken.parse(end_token)
- token = (start_token, str(end_token))
- else:
- token = (str(end_token), str(end_token))
+ rows, token = yield self.runInteraction(
+ "get_recent_event_ids_for_room", self._paginate_room_events_txn,
+ room_id, from_token=end_token, limit=limit,
+ )
- return rows, token
+ # We want to return the results in ascending order.
+ rows.reverse()
- return self.runInteraction(
- "get_recent_events_for_room", get_recent_events_for_room_txn
- )
+ defer.returnValue((rows, token))
def get_room_event_after_stream_ordering(self, room_id, stream_ordering):
"""Gets details of the first event in a room at or after a stream ordering
@@ -517,10 +504,20 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
@staticmethod
def _set_before_and_after(events, rows, topo_order=True):
+ """Inserts ordering information to events' internal metadata from
+ the DB rows.
+
+ Args:
+ events (list[FrozenEvent])
+ rows (list[_EventDictReturn])
+ topo_order (bool): Whether the events were ordered topologically
+ or by stream ordering. If true then all rows should have a non
+ null topological_ordering.
+ """
for event, row in zip(events, rows):
- stream = row["stream_ordering"]
- if topo_order:
- topo = event.depth
+ stream = row.stream_ordering
+ if topo_order and row.topological_ordering:
+ topo = row.topological_ordering
else:
topo = None
internal = event.internal_metadata
@@ -592,87 +589,27 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
retcols=["stream_ordering", "topological_ordering"],
)
- token = RoomStreamToken(
- results["topological_ordering"],
+ # Paginating backwards includes the event at the token, but paginating
+ # forward doesn't.
+ before_token = RoomStreamToken(
+ results["topological_ordering"] - 1,
results["stream_ordering"],
)
- if isinstance(self.database_engine, Sqlite3Engine):
- # SQLite3 doesn't optimise ``(x < a) OR (x = a AND y < b)``
- # So we give pass it to SQLite3 as the UNION ALL of the two queries.
-
- query_before = (
- "SELECT topological_ordering, stream_ordering, event_id FROM events"
- " WHERE room_id = ? AND topological_ordering < ?"
- " UNION ALL"
- " SELECT topological_ordering, stream_ordering, event_id FROM events"
- " WHERE room_id = ? AND topological_ordering = ? AND stream_ordering < ?"
- " ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ?"
- )
- before_args = (
- room_id, token.topological,
- room_id, token.topological, token.stream,
- before_limit,
- )
-
- query_after = (
- "SELECT topological_ordering, stream_ordering, event_id FROM events"
- " WHERE room_id = ? AND topological_ordering > ?"
- " UNION ALL"
- " SELECT topological_ordering, stream_ordering, event_id FROM events"
- " WHERE room_id = ? AND topological_ordering = ? AND stream_ordering > ?"
- " ORDER BY topological_ordering ASC, stream_ordering ASC LIMIT ?"
- )
- after_args = (
- room_id, token.topological,
- room_id, token.topological, token.stream,
- after_limit,
- )
- else:
- query_before = (
- "SELECT topological_ordering, stream_ordering, event_id FROM events"
- " WHERE room_id = ? AND %s"
- " ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ?"
- ) % (upper_bound(token, self.database_engine, inclusive=False),)
-
- before_args = (room_id, before_limit)
-
- query_after = (
- "SELECT topological_ordering, stream_ordering, event_id FROM events"
- " WHERE room_id = ? AND %s"
- " ORDER BY topological_ordering ASC, stream_ordering ASC LIMIT ?"
- ) % (lower_bound(token, self.database_engine, inclusive=False),)
-
- after_args = (room_id, after_limit)
-
- txn.execute(query_before, before_args)
-
- rows = self.cursor_to_dict(txn)
- events_before = [r["event_id"] for r in rows]
-
- if rows:
- start_token = str(RoomStreamToken(
- rows[0]["topological_ordering"],
- rows[0]["stream_ordering"] - 1,
- ))
- else:
- start_token = str(RoomStreamToken(
- token.topological,
- token.stream - 1,
- ))
-
- txn.execute(query_after, after_args)
+ after_token = RoomStreamToken(
+ results["topological_ordering"],
+ results["stream_ordering"],
+ )
- rows = self.cursor_to_dict(txn)
- events_after = [r["event_id"] for r in rows]
+ rows, start_token = self._paginate_room_events_txn(
+ txn, room_id, before_token, direction='b', limit=before_limit,
+ )
+ events_before = [r.event_id for r in rows]
- if rows:
- end_token = str(RoomStreamToken(
- rows[-1]["topological_ordering"],
- rows[-1]["stream_ordering"],
- ))
- else:
- end_token = str(token)
+ rows, end_token = self._paginate_room_events_txn(
+ txn, room_id, after_token, direction='f', limit=after_limit,
+ )
+ events_after = [r.event_id for r in rows]
return {
"before": {
@@ -735,17 +672,28 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
def has_room_changed_since(self, room_id, stream_id):
return self._events_stream_cache.has_entity_changed(room_id, stream_id)
+ def _paginate_room_events_txn(self, txn, room_id, from_token, to_token=None,
+ direction='b', limit=-1, event_filter=None):
+ """Returns list of events before or after a given token.
-class StreamStore(StreamWorkerStore):
- def get_room_max_stream_ordering(self):
- return self._stream_id_gen.get_current_token()
-
- def get_room_min_stream_ordering(self):
- return self._backfill_id_gen.get_current_token()
+ Args:
+ txn
+ room_id (str)
+ from_token (RoomStreamToken): The token used to stream from
+ to_token (RoomStreamToken|None): A token which if given limits the
+ results to only those before
+ direction(char): Either 'b' or 'f' to indicate whether we are
+ paginating forwards or backwards from `from_key`.
+ limit (int): The maximum number of events to return. Zero or less
+ means no limit.
+ event_filter (Filter|None): If provided filters the events to
+ those that match the filter.
- @defer.inlineCallbacks
- def paginate_room_events(self, room_id, from_key, to_key=None,
- direction='b', limit=-1, event_filter=None):
+ Returns:
+ Deferred[tuple[list[_EventDictReturn], str]]: Returns the results
+ as a list of _EventDictReturn and a token that points to the end
+ of the result set.
+ """
# Tokens really represent positions between elements, but we use
# the convention of pointing to the event before the gap. Hence
# we have a bit of asymmetry when it comes to equalities.
@@ -753,20 +701,20 @@ class StreamStore(StreamWorkerStore):
if direction == 'b':
order = "DESC"
bounds = upper_bound(
- RoomStreamToken.parse(from_key), self.database_engine
+ from_token, self.database_engine
)
- if to_key:
+ if to_token:
bounds = "%s AND %s" % (bounds, lower_bound(
- RoomStreamToken.parse(to_key), self.database_engine
+ to_token, self.database_engine
))
else:
order = "ASC"
bounds = lower_bound(
- RoomStreamToken.parse(from_key), self.database_engine
+ from_token, self.database_engine
)
- if to_key:
+ if to_token:
bounds = "%s AND %s" % (bounds, upper_bound(
- RoomStreamToken.parse(to_key), self.database_engine
+ to_token, self.database_engine
))
filter_clause, filter_args = filter_to_clause(event_filter)
@@ -782,7 +730,8 @@ class StreamStore(StreamWorkerStore):
limit_str = ""
sql = (
- "SELECT * FROM events"
+ "SELECT event_id, topological_ordering, stream_ordering"
+ " FROM events"
" WHERE outlier = ? AND room_id = ? AND %(bounds)s"
" ORDER BY topological_ordering %(order)s,"
" stream_ordering %(order)s %(limit)s"
@@ -792,35 +741,72 @@ class StreamStore(StreamWorkerStore):
"limit": limit_str
}
- def f(txn):
- txn.execute(sql, args)
-
- rows = self.cursor_to_dict(txn)
-
- if rows:
- topo = rows[-1]["topological_ordering"]
- toke = rows[-1]["stream_ordering"]
- if direction == 'b':
- # Tokens are positions between events.
- # This token points *after* the last event in the chunk.
- # We need it to point to the event before it in the chunk
- # when we are going backwards so we subtract one from the
- # stream part.
- toke -= 1
- next_token = str(RoomStreamToken(topo, toke))
- else:
- # TODO (erikj): We should work out what to do here instead.
- next_token = to_key if to_key else from_key
+ txn.execute(sql, args)
+
+ rows = [_EventDictReturn(row[0], row[1], row[2]) for row in txn]
+
+ if rows:
+ topo = rows[-1].topological_ordering
+ toke = rows[-1].stream_ordering
+ if direction == 'b':
+ # Tokens are positions between events.
+ # This token points *after* the last event in the chunk.
+ # We need it to point to the event before it in the chunk
+ # when we are going backwards so we subtract one from the
+ # stream part.
+ toke -= 1
+ next_token = RoomStreamToken(topo, toke)
+ else:
+ # TODO (erikj): We should work out what to do here instead.
+ next_token = to_token if to_token else from_token
+
+ return rows, str(next_token),
+
+ @defer.inlineCallbacks
+ def paginate_room_events(self, room_id, from_key, to_key=None,
+ direction='b', limit=-1, event_filter=None):
+ """Returns list of events before or after a given token.
- return rows, next_token,
+ Args:
+ room_id (str)
+ from_key (str): The token used to stream from
+ to_key (str|None): A token which if given limits the results to
+ only those before
+ direction(char): Either 'b' or 'f' to indicate whether we are
+ paginating forwards or backwards from `from_key`.
+ limit (int): The maximum number of events to return. Zero or less
+ means no limit.
+ event_filter (Filter|None): If provided filters the events to
+ those that match the filter.
- rows, token = yield self.runInteraction("paginate_room_events", f)
+ Returns:
+ tuple[list[dict], str]: Returns the results as a list of dicts and
+ a token that points to the end of the result set. The dicts have
+ the keys "event_id", "topological_ordering" and "stream_orderign".
+ """
+
+ from_key = RoomStreamToken.parse(from_key)
+ if to_key:
+ to_key = RoomStreamToken.parse(to_key)
+
+ rows, token = yield self.runInteraction(
+ "paginate_room_events", self._paginate_room_events_txn,
+ room_id, from_key, to_key, direction, limit, event_filter,
+ )
events = yield self._get_events(
- [r["event_id"] for r in rows],
+ [r.event_id for r in rows],
get_prev_content=True
)
self._set_before_and_after(events, rows)
defer.returnValue((events, token))
+
+
+class StreamStore(StreamWorkerStore):
+ def get_room_max_stream_ordering(self):
+ return self._stream_id_gen.get_current_token()
+
+ def get_room_min_stream_ordering(self):
+ return self._backfill_id_gen.get_current_token()
diff --git a/synapse/storage/tags.py b/synapse/storage/tags.py
index 13bff9f055..6671d3cfca 100644
--- a/synapse/storage/tags.py
+++ b/synapse/storage/tags.py
@@ -22,6 +22,8 @@ from twisted.internet import defer
import simplejson as json
import logging
+from six.moves import range
+
logger = logging.getLogger(__name__)
@@ -98,7 +100,7 @@ class TagsWorkerStore(AccountDataWorkerStore):
batch_size = 50
results = []
- for i in xrange(0, len(tag_ids), batch_size):
+ for i in range(0, len(tag_ids), batch_size):
tags = yield self.runInteraction(
"get_all_updated_tag_content",
get_tag_content,
|