diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index ad1157f979..aa49f53458 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -76,7 +76,7 @@ class RegistrationStore(SQLBaseStore):
@defer.inlineCallbacks
def register(self, user_id, token, password_hash,
- was_guest=False, make_guest=False):
+ was_guest=False, make_guest=False, appservice_id=None):
"""Attempts to register an account.
Args:
@@ -87,16 +87,32 @@ class RegistrationStore(SQLBaseStore):
upgraded to a non-guest account.
make_guest (boolean): True if the the new user should be guest,
false to add a regular user account.
+ appservice_id (str): The ID of the appservice registering the user.
Raises:
StoreError if the user_id could not be registered.
"""
yield self.runInteraction(
"register",
- self._register, user_id, token, password_hash, was_guest, make_guest
+ self._register,
+ user_id,
+ token,
+ password_hash,
+ was_guest,
+ make_guest,
+ appservice_id
)
self.is_guest.invalidate((user_id,))
- def _register(self, txn, user_id, token, password_hash, was_guest, make_guest):
+ def _register(
+ self,
+ txn,
+ user_id,
+ token,
+ password_hash,
+ was_guest,
+ make_guest,
+ appservice_id
+ ):
now = int(self.clock.time())
next_id = self._access_tokens_id_gen.get_next()
@@ -111,9 +127,21 @@ class RegistrationStore(SQLBaseStore):
[password_hash, now, 1 if make_guest else 0, user_id])
else:
txn.execute("INSERT INTO users "
- "(name, password_hash, creation_ts, is_guest) "
- "VALUES (?,?,?,?)",
- [user_id, password_hash, now, 1 if make_guest else 0])
+ "("
+ " name,"
+ " password_hash,"
+ " creation_ts,"
+ " is_guest,"
+ " appservice_id"
+ ") "
+ "VALUES (?,?,?,?,?)",
+ [
+ user_id,
+ password_hash,
+ now,
+ 1 if make_guest else 0,
+ appservice_id,
+ ])
except self.database_engine.module.IntegrityError:
raise StoreError(
400, "User ID already taken.", errcode=Codes.USER_IN_USE
diff --git a/synapse/storage/schema/delta/30/as_users.py b/synapse/storage/schema/delta/30/as_users.py
index 4da3c59de2..4f6e9dd540 100644
--- a/synapse/storage/schema/delta/30/as_users.py
+++ b/synapse/storage/schema/delta/30/as_users.py
@@ -52,12 +52,17 @@ def run_upgrade(cur, database_engine, config, *args, **kwargs):
" service (IDs %s and %s); assigning arbitrarily to %s" %
(user_id, owned[user_id], appservice.id, owned[user_id])
)
- owned[user_id] = appservice.id
-
- for user_id, as_id in owned.items():
- cur.execute(
- database_engine.convert_param_style(
- "UPDATE users SET appservice_id = ? WHERE name = ?"
- ),
- (as_id, user_id)
- )
+ owned.setdefault(appservice.id, []).append(user_id)
+
+ for as_id, user_ids in owned.items():
+ n = 100
+ user_chunks = (user_ids[i:i + 100] for i in xrange(0, len(user_ids), n))
+ for chunk in user_chunks:
+ cur.execute(
+ database_engine.convert_param_style(
+ "UPDATE users SET appservice_id = ? WHERE name IN (%s)" % (
+ ",".join("?" for _ in chunk),
+ )
+ ),
+ [as_id] + chunk
+ )
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 8908d5b5da..7f4a827528 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -184,6 +184,9 @@ class StreamStore(SQLBaseStore):
@defer.inlineCallbacks
def get_room_events_stream_for_room(self, room_id, from_key, to_key, limit=0,
order='DESC'):
+ # Note: If from_key is None then we return in topological order. This
+ # is because in that case we're using this as a "get the last few messages
+ # in a room" function, rather than "get new messages since last sync"
if from_key is not None:
from_id = RoomStreamToken.parse_stream_token(from_key).stream
else:
@@ -217,8 +220,8 @@ class StreamStore(SQLBaseStore):
" room_id = ?"
" AND not outlier"
" AND stream_ordering <= ?"
- " ORDER BY stream_ordering %s LIMIT ?"
- ) % (order,)
+ " ORDER BY topological_ordering %s, stream_ordering %s LIMIT ?"
+ ) % (order, order,)
txn.execute(sql, (room_id, to_id, limit))
rows = self.cursor_to_dict(txn)
@@ -232,7 +235,7 @@ class StreamStore(SQLBaseStore):
get_prev_content=True
)
- self._set_before_and_after(ret, rows, topo_order=False)
+ self._set_before_and_after(ret, rows, topo_order=from_id is None)
if order.lower() == "desc":
ret.reverse()
|