diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 32c6677d47..d766a30299 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -807,6 +807,11 @@ class SQLBaseStore(object):
if txn.rowcount > 1:
raise StoreError(500, "more than one row matched")
+ def _simple_delete(self, table, keyvalues, desc):
+ return self.runInteraction(
+ desc, self._simple_delete_txn, table, keyvalues
+ )
+
@staticmethod
def _simple_delete_txn(txn, table, keyvalues):
sql = "DELETE FROM %s WHERE %s" % (
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index 940e11d7a2..3d93285f84 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -16,6 +16,8 @@
from ._base import SQLBaseStore
from twisted.internet import defer
from synapse.util.caches.descriptors import cachedInlineCallbacks
+from synapse.types import RoomStreamToken
+from .stream import lower_bound
import logging
import ujson as json
@@ -73,6 +75,9 @@ class EventPushActionsStore(SQLBaseStore):
stream_ordering = results[0][0]
topological_ordering = results[0][1]
+ token = RoomStreamToken(
+ topological_ordering, stream_ordering
+ )
sql = (
"SELECT sum(notif), sum(highlight)"
@@ -80,15 +85,10 @@ class EventPushActionsStore(SQLBaseStore):
" WHERE"
" user_id = ?"
" AND room_id = ?"
- " AND ("
- " topological_ordering > ?"
- " OR (topological_ordering = ? AND stream_ordering > ?)"
- ")"
- )
- txn.execute(sql, (
- user_id, room_id,
- topological_ordering, topological_ordering, stream_ordering
- ))
+ " AND %s"
+ ) % (lower_bound(token, self.database_engine, inclusive=False),)
+
+ txn.execute(sql, (user_id, room_id))
row = txn.fetchone()
if row:
return {
@@ -152,7 +152,7 @@ class EventPushActionsStore(SQLBaseStore):
if max_stream_ordering is not None:
sql += " AND ep.stream_ordering <= ?"
args.append(max_stream_ordering)
- sql += " ORDER BY ep.stream_ordering ASC LIMIT ?"
+ sql += " ORDER BY ep.stream_ordering DESC LIMIT ?"
args.append(limit)
txn.execute(sql, args)
return txn.fetchall()
@@ -176,14 +176,16 @@ class EventPushActionsStore(SQLBaseStore):
if max_stream_ordering is not None:
sql += " AND ep.stream_ordering <= ?"
args.append(max_stream_ordering)
- sql += " ORDER BY ep.stream_ordering ASC"
+ sql += " ORDER BY ep.stream_ordering DESC LIMIT ?"
+ args.append(limit)
txn.execute(sql, args)
return txn.fetchall()
no_read_receipt = yield self.runInteraction(
"get_unread_push_actions_for_user_in_range", get_no_receipt
)
- defer.returnValue([
+ # Make a list of dicts from the two sets of results.
+ notifs = [
{
"event_id": row[0],
"room_id": row[1],
@@ -191,7 +193,16 @@ class EventPushActionsStore(SQLBaseStore):
"actions": json.loads(row[3]),
"received_ts": row[4],
} for row in after_read_receipt + no_read_receipt
- ])
+ ]
+
+ # Now sort it so it's ordered correctly, since currently it will
+ # contain results from the first query, correctly ordered, followed
+ # by results from the second query, but we want them all ordered
+ # by received_ts
+ notifs.sort(key=lambda r: -(r['received_ts'] or 0))
+
+ # Now return the first `limit`
+ defer.returnValue(notifs[:limit])
@defer.inlineCallbacks
def get_time_of_last_push_action_before(self, stream_ordering):
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 6d978ffcd5..b582942164 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -23,6 +23,7 @@ from synapse.util.async import ObservableDeferred
from synapse.util.logcontext import preserve_fn, PreserveLoggingContext
from synapse.util.logutils import log_function
from synapse.api.constants import EventTypes
+from synapse.api.errors import SynapseError
from canonicaljson import encode_canonical_json
from collections import deque, namedtuple
@@ -355,7 +356,6 @@ class EventsStore(SQLBaseStore):
txn.call_after(self.get_rooms_for_user.invalidate_all)
txn.call_after(self.get_users_in_room.invalidate, (event.room_id,))
txn.call_after(self.get_joined_hosts_for_room.invalidate, (event.room_id,))
- txn.call_after(self.get_room_name_and_aliases.invalidate, (event.room_id,))
# Add an entry to the current_state_resets table to record the point
# where we clobbered the current state
@@ -666,12 +666,6 @@ class EventsStore(SQLBaseStore):
(event.room_id, event.type, event.state_key,)
)
- if event.type in [EventTypes.Name, EventTypes.Aliases]:
- txn.call_after(
- self.get_room_name_and_aliases.invalidate,
- (event.room_id,)
- )
-
self._simple_upsert_txn(
txn,
"current_state_events",
@@ -1288,6 +1282,156 @@ class EventsStore(SQLBaseStore):
)
return self.runInteraction("get_all_new_events", get_all_new_events_txn)
+ def delete_old_state(self, room_id, topological_ordering):
+ return self.runInteraction(
+ "delete_old_state",
+ self._delete_old_state_txn, room_id, topological_ordering
+ )
+
+ def _delete_old_state_txn(self, txn, room_id, topological_ordering):
+ """Deletes old room state
+ """
+
+ # Tables that should be pruned:
+ # event_auth
+ # event_backward_extremities
+ # event_content_hashes
+ # event_destinations
+ # event_edge_hashes
+ # event_edges
+ # event_forward_extremities
+ # event_json
+ # event_push_actions
+ # event_reference_hashes
+ # event_search
+ # event_signatures
+ # event_to_state_groups
+ # events
+ # rejections
+ # room_depth
+ # state_groups
+ # state_groups_state
+
+ # First ensure that we're not about to delete all the forward extremeties
+ txn.execute(
+ "SELECT e.event_id, e.depth FROM events as e "
+ "INNER JOIN event_forward_extremities as f "
+ "ON e.event_id = f.event_id "
+ "AND e.room_id = f.room_id "
+ "WHERE f.room_id = ?",
+ (room_id,)
+ )
+ rows = txn.fetchall()
+ max_depth = max(row[0] for row in rows)
+
+ if max_depth <= topological_ordering:
+ # We need to ensure we don't delete all the events from the datanase
+ # otherwise we wouldn't be able to send any events (due to not
+ # having any backwards extremeties)
+ raise SynapseError(
+ 400, "topological_ordering is greater than forward extremeties"
+ )
+
+ txn.execute(
+ "SELECT event_id, state_key FROM events"
+ " LEFT JOIN state_events USING (room_id, event_id)"
+ " WHERE room_id = ? AND topological_ordering < ?",
+ (room_id, topological_ordering,)
+ )
+ event_rows = txn.fetchall()
+
+ # We calculate the new entries for the backward extremeties by finding
+ # all events that point to events that are to be purged
+ txn.execute(
+ "SELECT e.event_id FROM events as e"
+ " INNER JOIN event_edges as ed ON e.event_id = ed.prev_event_id"
+ " INNER JOIN events as e2 ON e2.event_id = ed.event_id"
+ " WHERE e.room_id = ? AND e.topological_ordering < ?"
+ " AND e2.topological_ordering >= ?",
+ (room_id, topological_ordering, topological_ordering)
+ )
+ new_backwards_extrems = txn.fetchall()
+
+ # Get all state groups that are only referenced by events that are
+ # to be deleted.
+ txn.execute(
+ "SELECT state_group FROM event_to_state_groups"
+ " INNER JOIN events USING (event_id)"
+ " WHERE state_group IN ("
+ " SELECT DISTINCT state_group FROM events"
+ " INNER JOIN event_to_state_groups USING (event_id)"
+ " WHERE room_id = ? AND topological_ordering < ?"
+ " )"
+ " GROUP BY state_group HAVING MAX(topological_ordering) < ?",
+ (room_id, topological_ordering, topological_ordering)
+ )
+ state_rows = txn.fetchall()
+ txn.executemany(
+ "DELETE FROM state_groups_state WHERE state_group = ?",
+ state_rows
+ )
+ txn.executemany(
+ "DELETE FROM state_groups WHERE id = ?",
+ state_rows
+ )
+ # Delete all non-state
+ txn.executemany(
+ "DELETE FROM event_to_state_groups WHERE event_id = ?",
+ [(event_id,) for event_id, _ in event_rows]
+ )
+
+ txn.execute(
+ "UPDATE room_depth SET min_depth = ? WHERE room_id = ?",
+ (topological_ordering, room_id,)
+ )
+
+ # Delete all remote non-state events
+ to_delete = [
+ (event_id,) for event_id, state_key in event_rows
+ if state_key is None and not self.hs.is_mine_id(event_id)
+ ]
+ for table in (
+ "events",
+ "event_json",
+ "event_auth",
+ "event_content_hashes",
+ "event_destinations",
+ "event_edge_hashes",
+ "event_edges",
+ "event_forward_extremities",
+ "event_push_actions",
+ "event_reference_hashes",
+ "event_search",
+ "event_signatures",
+ "rejections",
+ "event_backward_extremities",
+ ):
+ txn.executemany(
+ "DELETE FROM %s WHERE event_id = ?" % (table,),
+ to_delete
+ )
+
+ # Update backward extremeties
+ txn.executemany(
+ "INSERT INTO event_backward_extremities (room_id, event_id)"
+ " VALUES (?, ?)",
+ [(room_id, event_id) for event_id, in new_backwards_extrems]
+ )
+
+ txn.executemany(
+ "DELETE FROM events WHERE event_id = ?",
+ to_delete
+ )
+ # Mark all state and own events as outliers
+ txn.executemany(
+ "UPDATE events SET outlier = ?"
+ " WHERE event_id = ?",
+ [
+ (True, event_id,) for event_id, state_key in event_rows
+ if state_key is not None or self.hs.is_mine_id(event_id)
+ ]
+ )
+
AllNewEventsResult = namedtuple("AllNewEventsResult", [
"new_forward_events", "new_backfill_events",
diff --git a/synapse/storage/media_repository.py b/synapse/storage/media_repository.py
index a820fcf07f..4c0f82353d 100644
--- a/synapse/storage/media_repository.py
+++ b/synapse/storage/media_repository.py
@@ -157,10 +157,25 @@ class MediaRepositoryStore(SQLBaseStore):
"created_ts": time_now_ms,
"upload_name": upload_name,
"filesystem_id": filesystem_id,
+ "last_access_ts": time_now_ms,
},
desc="store_cached_remote_media",
)
+ def update_cached_last_access_time(self, origin_id_tuples, time_ts):
+ def update_cache_txn(txn):
+ sql = (
+ "UPDATE remote_media_cache SET last_access_ts = ?"
+ " WHERE media_origin = ? AND media_id = ?"
+ )
+
+ txn.executemany(sql, (
+ (time_ts, media_origin, media_id)
+ for media_origin, media_id in origin_id_tuples
+ ))
+
+ return self.runInteraction("update_cached_last_access_time", update_cache_txn)
+
def get_remote_media_thumbnails(self, origin, media_id):
return self._simple_select_list(
"remote_media_cache_thumbnails",
@@ -190,3 +205,32 @@ class MediaRepositoryStore(SQLBaseStore):
},
desc="store_remote_media_thumbnail",
)
+
+ def get_remote_media_before(self, before_ts):
+ sql = (
+ "SELECT media_origin, media_id, filesystem_id"
+ " FROM remote_media_cache"
+ " WHERE last_access_ts < ?"
+ )
+
+ return self._execute(
+ "get_remote_media_before", self.cursor_to_dict, sql, before_ts
+ )
+
+ def delete_remote_media(self, media_origin, media_id):
+ def delete_remote_media_txn(txn):
+ self._simple_delete_txn(
+ txn,
+ "remote_media_cache",
+ keyvalues={
+ "media_origin": media_origin, "media_id": media_id
+ },
+ )
+ self._simple_delete_txn(
+ txn,
+ "remote_media_cache_thumbnails",
+ keyvalues={
+ "media_origin": media_origin, "media_id": media_id
+ },
+ )
+ return self.runInteraction("delete_remote_media", delete_remote_media_txn)
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index c8487c8838..8801669a6b 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -25,7 +25,7 @@ logger = logging.getLogger(__name__)
# Remember to update this number every time a change is made to database
# schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 32
+SCHEMA_VERSION = 33
dir_path = os.path.abspath(os.path.dirname(__file__))
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 3de9e0f709..0a68341494 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -77,7 +77,7 @@ class RegistrationStore(SQLBaseStore):
@defer.inlineCallbacks
def register(self, user_id, token, password_hash,
was_guest=False, make_guest=False, appservice_id=None,
- create_profile_with_localpart=None):
+ create_profile_with_localpart=None, admin=False):
"""Attempts to register an account.
Args:
@@ -104,6 +104,7 @@ class RegistrationStore(SQLBaseStore):
make_guest,
appservice_id,
create_profile_with_localpart,
+ admin
)
self.get_user_by_id.invalidate((user_id,))
self.is_guest.invalidate((user_id,))
@@ -118,6 +119,7 @@ class RegistrationStore(SQLBaseStore):
make_guest,
appservice_id,
create_profile_with_localpart,
+ admin,
):
now = int(self.clock.time())
@@ -125,29 +127,33 @@ class RegistrationStore(SQLBaseStore):
try:
if was_guest:
- txn.execute("UPDATE users SET"
- " password_hash = ?,"
- " upgrade_ts = ?,"
- " is_guest = ?"
- " WHERE name = ?",
- [password_hash, now, 1 if make_guest else 0, user_id])
+ self._simple_update_one_txn(
+ txn,
+ "users",
+ keyvalues={
+ "name": user_id,
+ },
+ updatevalues={
+ "password_hash": password_hash,
+ "upgrade_ts": now,
+ "is_guest": 1 if make_guest else 0,
+ "appservice_id": appservice_id,
+ "admin": 1 if admin else 0,
+ }
+ )
else:
- txn.execute("INSERT INTO users "
- "("
- " name,"
- " password_hash,"
- " creation_ts,"
- " is_guest,"
- " appservice_id"
- ") "
- "VALUES (?,?,?,?,?)",
- [
- user_id,
- password_hash,
- now,
- 1 if make_guest else 0,
- appservice_id,
- ])
+ self._simple_insert_txn(
+ txn,
+ "users",
+ values={
+ "name": user_id,
+ "password_hash": password_hash,
+ "creation_ts": now,
+ "is_guest": 1 if make_guest else 0,
+ "appservice_id": appservice_id,
+ "admin": 1 if admin else 0,
+ }
+ )
except self.database_engine.module.IntegrityError:
raise StoreError(
400, "User ID already taken.", errcode=Codes.USER_IN_USE
@@ -384,6 +390,15 @@ class RegistrationStore(SQLBaseStore):
defer.returnValue(ret['user_id'])
defer.returnValue(None)
+ def user_delete_threepids(self, user_id):
+ return self._simple_delete(
+ "user_threepids",
+ keyvalues={
+ "user_id": user_id,
+ },
+ desc="user_delete_threepids",
+ )
+
@defer.inlineCallbacks
def count_all_users(self):
"""Counts all users registered on the homeserver."""
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index 97f9f1929c..8251f58670 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -18,7 +18,6 @@ from twisted.internet import defer
from synapse.api.errors import StoreError
from ._base import SQLBaseStore
-from synapse.util.caches.descriptors import cachedInlineCallbacks
from .engines import PostgresEngine, Sqlite3Engine
import collections
@@ -192,49 +191,6 @@ class RoomStore(SQLBaseStore):
# This should be unreachable.
raise Exception("Unrecognized database engine")
- @cachedInlineCallbacks()
- def get_room_name_and_aliases(self, room_id):
- def get_room_name(txn):
- sql = (
- "SELECT name FROM room_names"
- " INNER JOIN current_state_events USING (room_id, event_id)"
- " WHERE room_id = ?"
- " LIMIT 1"
- )
-
- txn.execute(sql, (room_id,))
- rows = txn.fetchall()
- if rows:
- return rows[0][0]
- else:
- return None
-
- return [row[0] for row in txn.fetchall()]
-
- def get_room_aliases(txn):
- sql = (
- "SELECT content FROM current_state_events"
- " INNER JOIN events USING (room_id, event_id)"
- " WHERE room_id = ?"
- )
- txn.execute(sql, (room_id,))
- return [row[0] for row in txn.fetchall()]
-
- name = yield self.runInteraction("get_room_name", get_room_name)
- alias_contents = yield self.runInteraction("get_room_aliases", get_room_aliases)
-
- aliases = []
-
- for c in alias_contents:
- try:
- content = json.loads(c)
- except:
- continue
-
- aliases.extend(content.get('aliases', []))
-
- defer.returnValue((name, aliases))
-
def add_event_report(self, room_id, event_id, user_id, reason, content,
received_ts):
next_id = self._event_reports_id_gen.get_next()
diff --git a/synapse/storage/schema/delta/33/remote_media_ts.py b/synapse/storage/schema/delta/33/remote_media_ts.py
new file mode 100644
index 0000000000..55ae43f395
--- /dev/null
+++ b/synapse/storage/schema/delta/33/remote_media_ts.py
@@ -0,0 +1,31 @@
+# Copyright 2016 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import time
+
+
+ALTER_TABLE = "ALTER TABLE remote_media_cache ADD COLUMN last_access_ts BIGINT"
+
+
+def run_create(cur, database_engine, *args, **kwargs):
+ cur.execute(ALTER_TABLE)
+
+
+def run_upgrade(cur, database_engine, *args, **kwargs):
+ cur.execute(
+ database_engine.convert_param_style(
+ "UPDATE remote_media_cache SET last_access_ts = ?"
+ ),
+ (int(time.time() * 1000),)
+ )
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index b9ad965fd6..c33ac5a8d7 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -40,6 +40,7 @@ from synapse.util.caches.descriptors import cached
from synapse.api.constants import EventTypes
from synapse.types import RoomStreamToken
from synapse.util.logcontext import preserve_fn
+from synapse.storage.engines import PostgresEngine, Sqlite3Engine
import logging
@@ -54,25 +55,43 @@ _STREAM_TOKEN = "stream"
_TOPOLOGICAL_TOKEN = "topological"
-def lower_bound(token):
+def lower_bound(token, engine, inclusive=False):
+ inclusive = "=" if inclusive else ""
if token.topological is None:
- return "(%d < %s)" % (token.stream, "stream_ordering")
+ return "(%d <%s %s)" % (token.stream, inclusive, "stream_ordering")
else:
- return "(%d < %s OR (%d = %s AND %d < %s))" % (
+ if isinstance(engine, PostgresEngine):
+ # Postgres doesn't optimise ``(x < a) OR (x=a AND y<b)`` as well
+ # as it optimises ``(x,y) < (a,b)`` on multicolumn indexes. So we
+ # use the later form when running against postgres.
+ return "((%d,%d) <%s (%s,%s))" % (
+ token.topological, token.stream, inclusive,
+ "topological_ordering", "stream_ordering",
+ )
+ return "(%d < %s OR (%d = %s AND %d <%s %s))" % (
token.topological, "topological_ordering",
token.topological, "topological_ordering",
- token.stream, "stream_ordering",
+ token.stream, inclusive, "stream_ordering",
)
-def upper_bound(token):
+def upper_bound(token, engine, inclusive=True):
+ inclusive = "=" if inclusive else ""
if token.topological is None:
- return "(%d >= %s)" % (token.stream, "stream_ordering")
+ return "(%d >%s %s)" % (token.stream, inclusive, "stream_ordering")
else:
- return "(%d > %s OR (%d = %s AND %d >= %s))" % (
+ if isinstance(engine, PostgresEngine):
+ # Postgres doesn't optimise ``(x > a) OR (x=a AND y>b)`` as well
+ # as it optimises ``(x,y) > (a,b)`` on multicolumn indexes. So we
+ # use the later form when running against postgres.
+ return "((%d,%d) >%s (%s,%s))" % (
+ token.topological, token.stream, inclusive,
+ "topological_ordering", "stream_ordering",
+ )
+ return "(%d > %s OR (%d = %s AND %d >%s %s))" % (
token.topological, "topological_ordering",
token.topological, "topological_ordering",
- token.stream, "stream_ordering",
+ token.stream, inclusive, "stream_ordering",
)
@@ -308,18 +327,22 @@ class StreamStore(SQLBaseStore):
args = [False, room_id]
if direction == 'b':
order = "DESC"
- bounds = upper_bound(RoomStreamToken.parse(from_key))
+ bounds = upper_bound(
+ RoomStreamToken.parse(from_key), self.database_engine
+ )
if to_key:
- bounds = "%s AND %s" % (
- bounds, lower_bound(RoomStreamToken.parse(to_key))
- )
+ bounds = "%s AND %s" % (bounds, lower_bound(
+ RoomStreamToken.parse(to_key), self.database_engine
+ ))
else:
order = "ASC"
- bounds = lower_bound(RoomStreamToken.parse(from_key))
+ bounds = lower_bound(
+ RoomStreamToken.parse(from_key), self.database_engine
+ )
if to_key:
- bounds = "%s AND %s" % (
- bounds, upper_bound(RoomStreamToken.parse(to_key))
- )
+ bounds = "%s AND %s" % (bounds, upper_bound(
+ RoomStreamToken.parse(to_key), self.database_engine
+ ))
if int(limit) > 0:
args.append(int(limit))
@@ -487,13 +510,13 @@ class StreamStore(SQLBaseStore):
row["topological_ordering"], row["stream_ordering"],)
)
- def get_max_topological_token_for_stream_and_room(self, room_id, stream_key):
+ def get_max_topological_token(self, room_id, stream_key):
sql = (
"SELECT max(topological_ordering) FROM events"
" WHERE room_id = ? AND stream_ordering < ?"
)
return self._execute(
- "get_max_topological_token_for_stream_and_room", None,
+ "get_max_topological_token", None,
sql, room_id, stream_key,
).addCallback(
lambda r: r[0][0] if r else 0
@@ -586,32 +609,60 @@ class StreamStore(SQLBaseStore):
retcols=["stream_ordering", "topological_ordering"],
)
- stream_ordering = results["stream_ordering"]
- topological_ordering = results["topological_ordering"]
-
- query_before = (
- "SELECT topological_ordering, stream_ordering, event_id FROM events"
- " WHERE room_id = ? AND (topological_ordering < ?"
- " OR (topological_ordering = ? AND stream_ordering < ?))"
- " ORDER BY topological_ordering DESC, stream_ordering DESC"
- " LIMIT ?"
+ token = RoomStreamToken(
+ results["topological_ordering"],
+ results["stream_ordering"],
)
- query_after = (
- "SELECT topological_ordering, stream_ordering, event_id FROM events"
- " WHERE room_id = ? AND (topological_ordering > ?"
- " OR (topological_ordering = ? AND stream_ordering > ?))"
- " ORDER BY topological_ordering ASC, stream_ordering ASC"
- " LIMIT ?"
- )
+ if isinstance(self.database_engine, Sqlite3Engine):
+ # SQLite3 doesn't optimise ``(x < a) OR (x = a AND y < b)``
+ # So we give pass it to SQLite3 as the UNION ALL of the two queries.
+
+ query_before = (
+ "SELECT topological_ordering, stream_ordering, event_id FROM events"
+ " WHERE room_id = ? AND topological_ordering < ?"
+ " UNION ALL"
+ " SELECT topological_ordering, stream_ordering, event_id FROM events"
+ " WHERE room_id = ? AND topological_ordering = ? AND stream_ordering < ?"
+ " ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ?"
+ )
+ before_args = (
+ room_id, token.topological,
+ room_id, token.topological, token.stream,
+ before_limit,
+ )
- txn.execute(
- query_before,
- (
- room_id, topological_ordering, topological_ordering,
- stream_ordering, before_limit,
+ query_after = (
+ "SELECT topological_ordering, stream_ordering, event_id FROM events"
+ " WHERE room_id = ? AND topological_ordering > ?"
+ " UNION ALL"
+ " SELECT topological_ordering, stream_ordering, event_id FROM events"
+ " WHERE room_id = ? AND topological_ordering = ? AND stream_ordering > ?"
+ " ORDER BY topological_ordering ASC, stream_ordering ASC LIMIT ?"
)
- )
+ after_args = (
+ room_id, token.topological,
+ room_id, token.topological, token.stream,
+ after_limit,
+ )
+ else:
+ query_before = (
+ "SELECT topological_ordering, stream_ordering, event_id FROM events"
+ " WHERE room_id = ? AND %s"
+ " ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ?"
+ ) % (upper_bound(token, self.database_engine, inclusive=False),)
+
+ before_args = (room_id, before_limit)
+
+ query_after = (
+ "SELECT topological_ordering, stream_ordering, event_id FROM events"
+ " WHERE room_id = ? AND %s"
+ " ORDER BY topological_ordering ASC, stream_ordering ASC LIMIT ?"
+ ) % (lower_bound(token, self.database_engine, inclusive=False),)
+
+ after_args = (room_id, after_limit)
+
+ txn.execute(query_before, before_args)
rows = self.cursor_to_dict(txn)
events_before = [r["event_id"] for r in rows]
@@ -623,17 +674,11 @@ class StreamStore(SQLBaseStore):
))
else:
start_token = str(RoomStreamToken(
- topological_ordering,
- stream_ordering - 1,
+ token.topological,
+ token.stream - 1,
))
- txn.execute(
- query_after,
- (
- room_id, topological_ordering, topological_ordering,
- stream_ordering, after_limit,
- )
- )
+ txn.execute(query_after, after_args)
rows = self.cursor_to_dict(txn)
events_after = [r["event_id"] for r in rows]
@@ -644,10 +689,7 @@ class StreamStore(SQLBaseStore):
rows[-1]["stream_ordering"],
))
else:
- end_token = str(RoomStreamToken(
- topological_ordering,
- stream_ordering,
- ))
+ end_token = str(token)
return {
"before": {
|