diff options
author | Erik Johnston <erik@matrix.org> | 2016-07-06 14:46:31 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2016-07-06 14:46:31 +0100 |
commit | a17e7caeb708f267e8ac2b18404a098e90792834 (patch) | |
tree | 2dee0e30691c1a9fd30ee9d674c8c5a6cfe7e7de /synapse/storage | |
parent | Add ReadWriteLock for pagination and history prune (diff) | |
parent | Check that there are no null bytes in user and passsword (diff) | |
download | synapse-a17e7caeb708f267e8ac2b18404a098e90792834.tar.xz |
Merge branch 'erikj/shared_secret' into erikj/test2
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/event_push_actions.py | 18 | ||||
-rw-r--r-- | synapse/storage/registration.py | 52 | ||||
-rw-r--r-- | synapse/storage/stream.py | 144 |
3 files changed, 131 insertions, 83 deletions
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 5f1b6f63a9..3d93285f84 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -16,6 +16,8 @@ from ._base import SQLBaseStore from twisted.internet import defer from synapse.util.caches.descriptors import cachedInlineCallbacks +from synapse.types import RoomStreamToken +from .stream import lower_bound import logging import ujson as json @@ -73,6 +75,9 @@ class EventPushActionsStore(SQLBaseStore): stream_ordering = results[0][0] topological_ordering = results[0][1] + token = RoomStreamToken( + topological_ordering, stream_ordering + ) sql = ( "SELECT sum(notif), sum(highlight)" @@ -80,15 +85,10 @@ class EventPushActionsStore(SQLBaseStore): " WHERE" " user_id = ?" " AND room_id = ?" - " AND (" - " topological_ordering > ?" - " OR (topological_ordering = ? AND stream_ordering > ?)" - ")" - ) - txn.execute(sql, ( - user_id, room_id, - topological_ordering, topological_ordering, stream_ordering - )) + " AND %s" + ) % (lower_bound(token, self.database_engine, inclusive=False),) + + txn.execute(sql, (user_id, room_id)) row = txn.fetchone() if row: return { diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 5c75dbab51..0a68341494 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -77,7 +77,7 @@ class RegistrationStore(SQLBaseStore): @defer.inlineCallbacks def register(self, user_id, token, password_hash, was_guest=False, make_guest=False, appservice_id=None, - create_profile_with_localpart=None): + create_profile_with_localpart=None, admin=False): """Attempts to register an account. Args: @@ -104,6 +104,7 @@ class RegistrationStore(SQLBaseStore): make_guest, appservice_id, create_profile_with_localpart, + admin ) self.get_user_by_id.invalidate((user_id,)) self.is_guest.invalidate((user_id,)) @@ -118,6 +119,7 @@ class RegistrationStore(SQLBaseStore): make_guest, appservice_id, create_profile_with_localpart, + admin, ): now = int(self.clock.time()) @@ -125,29 +127,33 @@ class RegistrationStore(SQLBaseStore): try: if was_guest: - txn.execute("UPDATE users SET" - " password_hash = ?," - " upgrade_ts = ?," - " is_guest = ?" - " WHERE name = ?", - [password_hash, now, 1 if make_guest else 0, user_id]) + self._simple_update_one_txn( + txn, + "users", + keyvalues={ + "name": user_id, + }, + updatevalues={ + "password_hash": password_hash, + "upgrade_ts": now, + "is_guest": 1 if make_guest else 0, + "appservice_id": appservice_id, + "admin": 1 if admin else 0, + } + ) else: - txn.execute("INSERT INTO users " - "(" - " name," - " password_hash," - " creation_ts," - " is_guest," - " appservice_id" - ") " - "VALUES (?,?,?,?,?)", - [ - user_id, - password_hash, - now, - 1 if make_guest else 0, - appservice_id, - ]) + self._simple_insert_txn( + txn, + "users", + values={ + "name": user_id, + "password_hash": password_hash, + "creation_ts": now, + "is_guest": 1 if make_guest else 0, + "appservice_id": appservice_id, + "admin": 1 if admin else 0, + } + ) except self.database_engine.module.IntegrityError: raise StoreError( 400, "User ID already taken.", errcode=Codes.USER_IN_USE diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index 3dda2dab55..c33ac5a8d7 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -40,6 +40,7 @@ from synapse.util.caches.descriptors import cached from synapse.api.constants import EventTypes from synapse.types import RoomStreamToken from synapse.util.logcontext import preserve_fn +from synapse.storage.engines import PostgresEngine, Sqlite3Engine import logging @@ -54,25 +55,43 @@ _STREAM_TOKEN = "stream" _TOPOLOGICAL_TOKEN = "topological" -def lower_bound(token): +def lower_bound(token, engine, inclusive=False): + inclusive = "=" if inclusive else "" if token.topological is None: - return "(%d < %s)" % (token.stream, "stream_ordering") + return "(%d <%s %s)" % (token.stream, inclusive, "stream_ordering") else: - return "(%d < %s OR (%d = %s AND %d < %s))" % ( + if isinstance(engine, PostgresEngine): + # Postgres doesn't optimise ``(x < a) OR (x=a AND y<b)`` as well + # as it optimises ``(x,y) < (a,b)`` on multicolumn indexes. So we + # use the later form when running against postgres. + return "((%d,%d) <%s (%s,%s))" % ( + token.topological, token.stream, inclusive, + "topological_ordering", "stream_ordering", + ) + return "(%d < %s OR (%d = %s AND %d <%s %s))" % ( token.topological, "topological_ordering", token.topological, "topological_ordering", - token.stream, "stream_ordering", + token.stream, inclusive, "stream_ordering", ) -def upper_bound(token): +def upper_bound(token, engine, inclusive=True): + inclusive = "=" if inclusive else "" if token.topological is None: - return "(%d >= %s)" % (token.stream, "stream_ordering") + return "(%d >%s %s)" % (token.stream, inclusive, "stream_ordering") else: - return "(%d > %s OR (%d = %s AND %d >= %s))" % ( + if isinstance(engine, PostgresEngine): + # Postgres doesn't optimise ``(x > a) OR (x=a AND y>b)`` as well + # as it optimises ``(x,y) > (a,b)`` on multicolumn indexes. So we + # use the later form when running against postgres. + return "((%d,%d) >%s (%s,%s))" % ( + token.topological, token.stream, inclusive, + "topological_ordering", "stream_ordering", + ) + return "(%d > %s OR (%d = %s AND %d >%s %s))" % ( token.topological, "topological_ordering", token.topological, "topological_ordering", - token.stream, "stream_ordering", + token.stream, inclusive, "stream_ordering", ) @@ -308,18 +327,22 @@ class StreamStore(SQLBaseStore): args = [False, room_id] if direction == 'b': order = "DESC" - bounds = upper_bound(RoomStreamToken.parse(from_key)) + bounds = upper_bound( + RoomStreamToken.parse(from_key), self.database_engine + ) if to_key: - bounds = "%s AND %s" % ( - bounds, lower_bound(RoomStreamToken.parse(to_key)) - ) + bounds = "%s AND %s" % (bounds, lower_bound( + RoomStreamToken.parse(to_key), self.database_engine + )) else: order = "ASC" - bounds = lower_bound(RoomStreamToken.parse(from_key)) + bounds = lower_bound( + RoomStreamToken.parse(from_key), self.database_engine + ) if to_key: - bounds = "%s AND %s" % ( - bounds, upper_bound(RoomStreamToken.parse(to_key)) - ) + bounds = "%s AND %s" % (bounds, upper_bound( + RoomStreamToken.parse(to_key), self.database_engine + )) if int(limit) > 0: args.append(int(limit)) @@ -586,32 +609,60 @@ class StreamStore(SQLBaseStore): retcols=["stream_ordering", "topological_ordering"], ) - stream_ordering = results["stream_ordering"] - topological_ordering = results["topological_ordering"] - - query_before = ( - "SELECT topological_ordering, stream_ordering, event_id FROM events" - " WHERE room_id = ? AND (topological_ordering < ?" - " OR (topological_ordering = ? AND stream_ordering < ?))" - " ORDER BY topological_ordering DESC, stream_ordering DESC" - " LIMIT ?" + token = RoomStreamToken( + results["topological_ordering"], + results["stream_ordering"], ) - query_after = ( - "SELECT topological_ordering, stream_ordering, event_id FROM events" - " WHERE room_id = ? AND (topological_ordering > ?" - " OR (topological_ordering = ? AND stream_ordering > ?))" - " ORDER BY topological_ordering ASC, stream_ordering ASC" - " LIMIT ?" - ) + if isinstance(self.database_engine, Sqlite3Engine): + # SQLite3 doesn't optimise ``(x < a) OR (x = a AND y < b)`` + # So we give pass it to SQLite3 as the UNION ALL of the two queries. + + query_before = ( + "SELECT topological_ordering, stream_ordering, event_id FROM events" + " WHERE room_id = ? AND topological_ordering < ?" + " UNION ALL" + " SELECT topological_ordering, stream_ordering, event_id FROM events" + " WHERE room_id = ? AND topological_ordering = ? AND stream_ordering < ?" + " ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ?" + ) + before_args = ( + room_id, token.topological, + room_id, token.topological, token.stream, + before_limit, + ) - txn.execute( - query_before, - ( - room_id, topological_ordering, topological_ordering, - stream_ordering, before_limit, + query_after = ( + "SELECT topological_ordering, stream_ordering, event_id FROM events" + " WHERE room_id = ? AND topological_ordering > ?" + " UNION ALL" + " SELECT topological_ordering, stream_ordering, event_id FROM events" + " WHERE room_id = ? AND topological_ordering = ? AND stream_ordering > ?" + " ORDER BY topological_ordering ASC, stream_ordering ASC LIMIT ?" ) - ) + after_args = ( + room_id, token.topological, + room_id, token.topological, token.stream, + after_limit, + ) + else: + query_before = ( + "SELECT topological_ordering, stream_ordering, event_id FROM events" + " WHERE room_id = ? AND %s" + " ORDER BY topological_ordering DESC, stream_ordering DESC LIMIT ?" + ) % (upper_bound(token, self.database_engine, inclusive=False),) + + before_args = (room_id, before_limit) + + query_after = ( + "SELECT topological_ordering, stream_ordering, event_id FROM events" + " WHERE room_id = ? AND %s" + " ORDER BY topological_ordering ASC, stream_ordering ASC LIMIT ?" + ) % (lower_bound(token, self.database_engine, inclusive=False),) + + after_args = (room_id, after_limit) + + txn.execute(query_before, before_args) rows = self.cursor_to_dict(txn) events_before = [r["event_id"] for r in rows] @@ -623,17 +674,11 @@ class StreamStore(SQLBaseStore): )) else: start_token = str(RoomStreamToken( - topological_ordering, - stream_ordering - 1, + token.topological, + token.stream - 1, )) - txn.execute( - query_after, - ( - room_id, topological_ordering, topological_ordering, - stream_ordering, after_limit, - ) - ) + txn.execute(query_after, after_args) rows = self.cursor_to_dict(txn) events_after = [r["event_id"] for r in rows] @@ -644,10 +689,7 @@ class StreamStore(SQLBaseStore): rows[-1]["stream_ordering"], )) else: - end_token = str(RoomStreamToken( - topological_ordering, - stream_ordering, - )) + end_token = str(token) return { "before": { |