summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/registration.py40
-rw-r--r--synapse/storage/schema/delta/30/as_users.py23
-rw-r--r--synapse/storage/stream.py9
3 files changed, 54 insertions, 18 deletions
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index ad1157f979..aa49f53458 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -76,7 +76,7 @@ class RegistrationStore(SQLBaseStore):
 
     @defer.inlineCallbacks
     def register(self, user_id, token, password_hash,
-                 was_guest=False, make_guest=False):
+                 was_guest=False, make_guest=False, appservice_id=None):
         """Attempts to register an account.
 
         Args:
@@ -87,16 +87,32 @@ class RegistrationStore(SQLBaseStore):
                 upgraded to a non-guest account.
             make_guest (boolean): True if the the new user should be guest,
                 false to add a regular user account.
+            appservice_id (str): The ID of the appservice registering the user.
         Raises:
             StoreError if the user_id could not be registered.
         """
         yield self.runInteraction(
             "register",
-            self._register, user_id, token, password_hash, was_guest, make_guest
+            self._register,
+            user_id,
+            token,
+            password_hash,
+            was_guest,
+            make_guest,
+            appservice_id
         )
         self.is_guest.invalidate((user_id,))
 
-    def _register(self, txn, user_id, token, password_hash, was_guest, make_guest):
+    def _register(
+        self,
+        txn,
+        user_id,
+        token,
+        password_hash,
+        was_guest,
+        make_guest,
+        appservice_id
+    ):
         now = int(self.clock.time())
 
         next_id = self._access_tokens_id_gen.get_next()
@@ -111,9 +127,21 @@ class RegistrationStore(SQLBaseStore):
                             [password_hash, now, 1 if make_guest else 0, user_id])
             else:
                 txn.execute("INSERT INTO users "
-                            "(name, password_hash, creation_ts, is_guest) "
-                            "VALUES (?,?,?,?)",
-                            [user_id, password_hash, now, 1 if make_guest else 0])
+                            "("
+                            "   name,"
+                            "   password_hash,"
+                            "   creation_ts,"
+                            "   is_guest,"
+                            "   appservice_id"
+                            ") "
+                            "VALUES (?,?,?,?,?)",
+                            [
+                                user_id,
+                                password_hash,
+                                now,
+                                1 if make_guest else 0,
+                                appservice_id,
+                            ])
         except self.database_engine.module.IntegrityError:
             raise StoreError(
                 400, "User ID already taken.", errcode=Codes.USER_IN_USE
diff --git a/synapse/storage/schema/delta/30/as_users.py b/synapse/storage/schema/delta/30/as_users.py
index 4da3c59de2..4f6e9dd540 100644
--- a/synapse/storage/schema/delta/30/as_users.py
+++ b/synapse/storage/schema/delta/30/as_users.py
@@ -52,12 +52,17 @@ def run_upgrade(cur, database_engine, config, *args, **kwargs):
                         " service (IDs %s and %s); assigning arbitrarily to %s" %
                         (user_id, owned[user_id], appservice.id, owned[user_id])
                     )
-                owned[user_id] = appservice.id
-
-    for user_id, as_id in owned.items():
-        cur.execute(
-            database_engine.convert_param_style(
-                "UPDATE users SET appservice_id = ? WHERE name = ?"
-            ),
-            (as_id, user_id)
-        )
+                owned.setdefault(appservice.id, []).append(user_id)
+
+    for as_id, user_ids in owned.items():
+        n = 100
+        user_chunks = (user_ids[i:i + 100] for i in xrange(0, len(user_ids), n))
+        for chunk in user_chunks:
+            cur.execute(
+                database_engine.convert_param_style(
+                    "UPDATE users SET appservice_id = ? WHERE name IN (%s)" % (
+                        ",".join("?" for _ in chunk),
+                    )
+                ),
+                [as_id] + chunk
+            )
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 8908d5b5da..7f4a827528 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -184,6 +184,9 @@ class StreamStore(SQLBaseStore):
     @defer.inlineCallbacks
     def get_room_events_stream_for_room(self, room_id, from_key, to_key, limit=0,
                                         order='DESC'):
+        # Note: If from_key is None then we return in topological order. This
+        # is because in that case we're using this as a "get the last few messages
+        # in a room" function, rather than "get new messages since last sync"
         if from_key is not None:
             from_id = RoomStreamToken.parse_stream_token(from_key).stream
         else:
@@ -217,8 +220,8 @@ class StreamStore(SQLBaseStore):
                     " room_id = ?"
                     " AND not outlier"
                     " AND stream_ordering <= ?"
-                    " ORDER BY stream_ordering %s LIMIT ?"
-                ) % (order,)
+                    " ORDER BY topological_ordering %s, stream_ordering %s LIMIT ?"
+                ) % (order, order,)
                 txn.execute(sql, (room_id, to_id, limit))
 
             rows = self.cursor_to_dict(txn)
@@ -232,7 +235,7 @@ class StreamStore(SQLBaseStore):
             get_prev_content=True
         )
 
-        self._set_before_and_after(ret, rows, topo_order=False)
+        self._set_before_and_after(ret, rows, topo_order=from_id is None)
 
         if order.lower() == "desc":
             ret.reverse()