diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index 9996f195a0..fe936b3e62 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -120,7 +120,6 @@ class DataStore(RoomMemberStore, RoomStore,
self._transaction_id_gen = IdGenerator(db_conn, "sent_transactions", "id")
self._state_groups_id_gen = IdGenerator(db_conn, "state_groups", "id")
self._access_tokens_id_gen = IdGenerator(db_conn, "access_tokens", "id")
- self._refresh_tokens_id_gen = IdGenerator(db_conn, "refresh_tokens", "id")
self._event_reports_id_gen = IdGenerator(db_conn, "event_reports", "id")
self._push_rule_id_gen = IdGenerator(db_conn, "push_rules", "id")
self._push_rules_enable_id_gen = IdGenerator(db_conn, "push_rules_enable", "id")
@@ -223,6 +222,7 @@ class DataStore(RoomMemberStore, RoomStore,
)
self._stream_order_on_start = self.get_room_max_stream_ordering()
+ self._min_stream_order_on_start = self.get_room_min_stream_ordering()
super(DataStore, self).__init__(hs)
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index d828d6ee1d..b62c459d8b 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -561,12 +561,17 @@ class SQLBaseStore(object):
@staticmethod
def _simple_select_onecol_txn(txn, table, keyvalues, retcol):
+ if keyvalues:
+ where = "WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.keys())
+ else:
+ where = ""
+
sql = (
- "SELECT %(retcol)s FROM %(table)s WHERE %(where)s"
+ "SELECT %(retcol)s FROM %(table)s %(where)s"
) % {
"retcol": retcol,
"table": table,
- "where": " AND ".join("%s = ?" % k for k in keyvalues.keys()),
+ "where": where,
}
txn.execute(sql, keyvalues.values())
@@ -744,10 +749,15 @@ class SQLBaseStore(object):
@staticmethod
def _simple_update_one_txn(txn, table, keyvalues, updatevalues):
- update_sql = "UPDATE %s SET %s WHERE %s" % (
+ if keyvalues:
+ where = "WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.keys())
+ else:
+ where = ""
+
+ update_sql = "UPDATE %s SET %s %s" % (
table,
", ".join("%s = ?" % (k,) for k in updatevalues),
- " AND ".join("%s = ?" % (k,) for k in keyvalues)
+ where,
)
txn.execute(
diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py
index 3d5994a580..514570561f 100644
--- a/synapse/storage/appservice.py
+++ b/synapse/storage/appservice.py
@@ -39,6 +39,14 @@ class ApplicationServiceStore(SQLBaseStore):
def get_app_services(self):
return self.services_cache
+ def get_if_app_services_interested_in_user(self, user_id):
+ """Check if the user is one associated with an app service
+ """
+ for service in self.services_cache:
+ if service.is_interested_in_user(user_id):
+ return True
+ return False
+
def get_app_service_by_user_id(self, user_id):
"""Retrieve an application service from their user ID.
diff --git a/synapse/storage/deviceinbox.py b/synapse/storage/deviceinbox.py
index f640e73714..2821eb89c9 100644
--- a/synapse/storage/deviceinbox.py
+++ b/synapse/storage/deviceinbox.py
@@ -242,7 +242,7 @@ class DeviceInboxStore(SQLBaseStore):
device_id(str): The recipient device_id.
up_to_stream_id(int): Where to delete messages up to.
Returns:
- A deferred that resolves when the messages have been deleted.
+ A deferred that resolves to the number of messages deleted.
"""
def delete_messages_for_device_txn(txn):
sql = (
@@ -251,6 +251,7 @@ class DeviceInboxStore(SQLBaseStore):
" AND stream_id <= ?"
)
txn.execute(sql, (user_id, device_id, up_to_stream_id))
+ return txn.rowcount
return self.runInteraction(
"delete_messages_for_device", delete_messages_for_device_txn
@@ -269,27 +270,29 @@ class DeviceInboxStore(SQLBaseStore):
return defer.succeed([])
def get_all_new_device_messages_txn(txn):
+ # We limit like this as we might have multiple rows per stream_id, and
+ # we want to make sure we always get all entries for any stream_id
+ # we return.
+ upper_pos = min(current_pos, last_pos + limit)
sql = (
- "SELECT stream_id FROM device_inbox"
+ "SELECT stream_id, user_id"
+ " FROM device_inbox"
" WHERE ? < stream_id AND stream_id <= ?"
- " GROUP BY stream_id"
" ORDER BY stream_id ASC"
- " LIMIT ?"
)
- txn.execute(sql, (last_pos, current_pos, limit))
- stream_ids = txn.fetchall()
- if not stream_ids:
- return []
- max_stream_id_in_limit = stream_ids[-1]
+ txn.execute(sql, (last_pos, upper_pos))
+ rows = txn.fetchall()
sql = (
- "SELECT stream_id, user_id, device_id, message_json"
- " FROM device_inbox"
+ "SELECT stream_id, destination"
+ " FROM device_federation_outbox"
" WHERE ? < stream_id AND stream_id <= ?"
" ORDER BY stream_id ASC"
)
- txn.execute(sql, (last_pos, max_stream_id_in_limit))
- return txn.fetchall()
+ txn.execute(sql, (last_pos, upper_pos))
+ rows.extend(txn.fetchall())
+
+ return rows
return self.runInteraction(
"get_all_new_device_messages", get_all_new_device_messages_txn
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index 9cd923eb93..7de3e8c58c 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -39,6 +39,14 @@ class EventPushActionsStore(SQLBaseStore):
columns=["user_id", "stream_ordering"],
)
+ self.register_background_index_update(
+ "event_push_actions_highlights_index",
+ index_name="event_push_actions_highlights_index",
+ table="event_push_actions",
+ columns=["user_id", "room_id", "topological_ordering", "stream_ordering"],
+ where_clause="highlight=1"
+ )
+
def _set_push_actions_for_event_and_users_txn(self, txn, event, tuples):
"""
Args:
@@ -88,8 +96,11 @@ class EventPushActionsStore(SQLBaseStore):
topological_ordering, stream_ordering
)
+ # First get number of notifications.
+ # We don't need to put a notif=1 clause as all rows always have
+ # notif=1
sql = (
- "SELECT sum(notif), sum(highlight)"
+ "SELECT count(*)"
" FROM event_push_actions ea"
" WHERE"
" user_id = ?"
@@ -99,13 +110,27 @@ class EventPushActionsStore(SQLBaseStore):
txn.execute(sql, (user_id, room_id))
row = txn.fetchone()
- if row:
- return {
- "notify_count": row[0] or 0,
- "highlight_count": row[1] or 0,
- }
- else:
- return {"notify_count": 0, "highlight_count": 0}
+ notify_count = row[0] if row else 0
+
+ # Now get the number of highlights
+ sql = (
+ "SELECT count(*)"
+ " FROM event_push_actions ea"
+ " WHERE"
+ " highlight = 1"
+ " AND user_id = ?"
+ " AND room_id = ?"
+ " AND %s"
+ ) % (lower_bound(token, self.database_engine, inclusive=False),)
+
+ txn.execute(sql, (user_id, room_id))
+ row = txn.fetchone()
+ highlight_count = row[0] if row else 0
+
+ return {
+ "notify_count": notify_count,
+ "highlight_count": highlight_count,
+ }
ret = yield self.runInteraction(
"get_unread_event_push_actions_by_room",
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 49aeb953bd..ecb79c07ef 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -54,6 +54,7 @@ def encode_json(json_object):
else:
return json.dumps(json_object, ensure_ascii=False)
+
# These values are used in the `enqueus_event` and `_do_fetch` methods to
# control how we batch/bulk fetch events from the database.
# The values are plucked out of thing air to make initial sync run faster
diff --git a/synapse/storage/filtering.py b/synapse/storage/filtering.py
index 5248736816..a2ccc66ea7 100644
--- a/synapse/storage/filtering.py
+++ b/synapse/storage/filtering.py
@@ -16,6 +16,7 @@
from twisted.internet import defer
from ._base import SQLBaseStore
+from synapse.api.errors import SynapseError, Codes
from synapse.util.caches.descriptors import cachedInlineCallbacks
import simplejson as json
@@ -24,6 +25,13 @@ import simplejson as json
class FilteringStore(SQLBaseStore):
@cachedInlineCallbacks(num_args=2)
def get_user_filter(self, user_localpart, filter_id):
+ # filter_id is BIGINT UNSIGNED, so if it isn't a number, fail
+ # with a coherent error message rather than 500 M_UNKNOWN.
+ try:
+ int(filter_id)
+ except ValueError:
+ raise SynapseError(400, "Invalid filter ID", Codes.INVALID_PARAM)
+
def_json = yield self._simple_select_one_onecol(
table="user_filters",
keyvalues={
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index 6576a30098..e46ae6502e 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -25,7 +25,7 @@ logger = logging.getLogger(__name__)
# Remember to update this number every time a change is made to database
# schema files, so the users will be informed on server restarts.
-SCHEMA_VERSION = 38
+SCHEMA_VERSION = 39
dir_path = os.path.abspath(os.path.dirname(__file__))
diff --git a/synapse/storage/presence.py b/synapse/storage/presence.py
index 21d0696640..7460f98a1f 100644
--- a/synapse/storage/presence.py
+++ b/synapse/storage/presence.py
@@ -37,6 +37,13 @@ class UserPresenceState(namedtuple("UserPresenceState",
status_msg (str): User set status message.
"""
+ def as_dict(self):
+ return dict(self._asdict())
+
+ @staticmethod
+ def from_dict(d):
+ return UserPresenceState(**d)
+
def copy_and_replace(self, **kwargs):
return self._replace(**kwargs)
diff --git a/synapse/storage/push_rule.py b/synapse/storage/push_rule.py
index 49721656b6..cbec255966 100644
--- a/synapse/storage/push_rule.py
+++ b/synapse/storage/push_rule.py
@@ -156,12 +156,20 @@ class PushRuleStore(SQLBaseStore):
event=event,
)
- local_users_in_room = set(u for u in users_in_room if self.hs.is_mine_id(u))
+ # We ignore app service users for now. This is so that we don't fill
+ # up the `get_if_users_have_pushers` cache with AS entries that we
+ # know don't have pushers, nor even read receipts.
+ local_users_in_room = set(
+ u for u in users_in_room
+ if self.hs.is_mine_id(u)
+ and not self.get_if_app_services_interested_in_user(u)
+ )
# users in the room who have pushers need to get push rules run because
# that's how their pushers work
if_users_with_pushers = yield self.get_if_users_have_pushers(
- local_users_in_room, on_invalidate=cache_context.invalidate,
+ local_users_in_room,
+ on_invalidate=cache_context.invalidate,
)
user_ids = set(
uid for uid, have_pusher in if_users_with_pushers.items() if have_pusher
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index 9747a04a9a..f72d15f5ed 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -405,7 +405,7 @@ class ReceiptsStore(SQLBaseStore):
room_id, receipt_type, user_id, event_ids, data
)
- max_persisted_id = self._stream_id_gen.get_current_token()
+ max_persisted_id = self._receipts_id_gen.get_current_token()
defer.returnValue((stream_id, max_persisted_id))
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index e404fa72de..983a8ec52b 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -68,31 +68,6 @@ class RegistrationStore(background_updates.BackgroundUpdateStore):
desc="add_access_token_to_user",
)
- @defer.inlineCallbacks
- def add_refresh_token_to_user(self, user_id, token, device_id=None):
- """Adds a refresh token for the given user.
-
- Args:
- user_id (str): The user ID.
- token (str): The new refresh token to add.
- device_id (str): ID of the device to associate with the access
- token
- Raises:
- StoreError if there was a problem adding this.
- """
- next_id = self._refresh_tokens_id_gen.get_next()
-
- yield self._simple_insert(
- "refresh_tokens",
- {
- "id": next_id,
- "user_id": user_id,
- "token": token,
- "device_id": device_id,
- },
- desc="add_refresh_token_to_user",
- )
-
def register(self, user_id, token=None, password_hash=None,
was_guest=False, make_guest=False, appservice_id=None,
create_profile_with_localpart=None, admin=False):
@@ -353,47 +328,6 @@ class RegistrationStore(background_updates.BackgroundUpdateStore):
token
)
- def exchange_refresh_token(self, refresh_token, token_generator):
- """Exchange a refresh token for a new one.
-
- Doing so invalidates the old refresh token - refresh tokens are single
- use.
-
- Args:
- refresh_token (str): The refresh token of a user.
- token_generator (fn: str -> str): Function which, when given a
- user ID, returns a unique refresh token for that user. This
- function must never return the same value twice.
- Returns:
- tuple of (user_id, new_refresh_token, device_id)
- Raises:
- StoreError if no user was found with that refresh token.
- """
- return self.runInteraction(
- "exchange_refresh_token",
- self._exchange_refresh_token,
- refresh_token,
- token_generator
- )
-
- def _exchange_refresh_token(self, txn, old_token, token_generator):
- sql = "SELECT user_id, device_id FROM refresh_tokens WHERE token = ?"
- txn.execute(sql, (old_token,))
- rows = self.cursor_to_dict(txn)
- if not rows:
- raise StoreError(403, "Did not recognize refresh token")
- user_id = rows[0]["user_id"]
- device_id = rows[0]["device_id"]
-
- # TODO(danielwh): Maybe perform a validation on the macaroon that
- # macaroon.user_id == user_id.
-
- new_token = token_generator(user_id)
- sql = "UPDATE refresh_tokens SET token = ? WHERE token = ?"
- txn.execute(sql, (new_token, old_token,))
-
- return user_id, new_token, device_id
-
@defer.inlineCallbacks
def is_server_admin(self, user):
res = yield self._simple_select_one_onecol(
diff --git a/synapse/storage/room.py b/synapse/storage/room.py
index 11813b44f6..8a2fe2fdf5 100644
--- a/synapse/storage/room.py
+++ b/synapse/storage/room.py
@@ -16,6 +16,7 @@
from twisted.internet import defer
from synapse.api.errors import StoreError
+from synapse.util.caches.descriptors import cached
from ._base import SQLBaseStore
from .engines import PostgresEngine, Sqlite3Engine
@@ -106,7 +107,11 @@ class RoomStore(SQLBaseStore):
entries = self._simple_select_list_txn(
txn,
table="public_room_list_stream",
- keyvalues={"room_id": room_id},
+ keyvalues={
+ "room_id": room_id,
+ "appservice_id": None,
+ "network_id": None,
+ },
retcols=("stream_id", "visibility"),
)
@@ -124,6 +129,8 @@ class RoomStore(SQLBaseStore):
"stream_id": next_id,
"room_id": room_id,
"visibility": is_public,
+ "appservice_id": None,
+ "network_id": None,
}
)
@@ -132,6 +139,87 @@ class RoomStore(SQLBaseStore):
"set_room_is_public",
set_room_is_public_txn, next_id,
)
+ self.hs.get_notifier().on_new_replication_data()
+
+ @defer.inlineCallbacks
+ def set_room_is_public_appservice(self, room_id, appservice_id, network_id,
+ is_public):
+ """Edit the appservice/network specific public room list.
+
+ Each appservice can have a number of published room lists associated
+ with them, keyed off of an appservice defined `network_id`, which
+ basically represents a single instance of a bridge to a third party
+ network.
+
+ Args:
+ room_id (str)
+ appservice_id (str)
+ network_id (str)
+ is_public (bool): Whether to publish or unpublish the room from the
+ list.
+ """
+ def set_room_is_public_appservice_txn(txn, next_id):
+ if is_public:
+ try:
+ self._simple_insert_txn(
+ txn,
+ table="appservice_room_list",
+ values={
+ "appservice_id": appservice_id,
+ "network_id": network_id,
+ "room_id": room_id
+ },
+ )
+ except self.database_engine.module.IntegrityError:
+ # We've already inserted, nothing to do.
+ return
+ else:
+ self._simple_delete_txn(
+ txn,
+ table="appservice_room_list",
+ keyvalues={
+ "appservice_id": appservice_id,
+ "network_id": network_id,
+ "room_id": room_id
+ },
+ )
+
+ entries = self._simple_select_list_txn(
+ txn,
+ table="public_room_list_stream",
+ keyvalues={
+ "room_id": room_id,
+ "appservice_id": appservice_id,
+ "network_id": network_id,
+ },
+ retcols=("stream_id", "visibility"),
+ )
+
+ entries.sort(key=lambda r: r["stream_id"])
+
+ add_to_stream = True
+ if entries:
+ add_to_stream = bool(entries[-1]["visibility"]) != is_public
+
+ if add_to_stream:
+ self._simple_insert_txn(
+ txn,
+ table="public_room_list_stream",
+ values={
+ "stream_id": next_id,
+ "room_id": room_id,
+ "visibility": is_public,
+ "appservice_id": appservice_id,
+ "network_id": network_id,
+ }
+ )
+
+ with self._public_room_id_gen.get_next() as next_id:
+ yield self.runInteraction(
+ "set_room_is_public_appservice",
+ set_room_is_public_appservice_txn, next_id,
+ )
+ self.hs.get_notifier().on_new_replication_data()
def get_public_room_ids(self):
return self._simple_select_onecol(
@@ -259,38 +347,96 @@ class RoomStore(SQLBaseStore):
def get_current_public_room_stream_id(self):
return self._public_room_id_gen.get_current_token()
- def get_public_room_ids_at_stream_id(self, stream_id):
+ @cached(num_args=2, max_entries=100)
+ def get_public_room_ids_at_stream_id(self, stream_id, network_tuple):
+ """Get pulbic rooms for a particular list, or across all lists.
+
+ Args:
+ stream_id (int)
+ network_tuple (ThirdPartyInstanceID): The list to use (None, None)
+ means the main list, None means all lsits.
+ """
return self.runInteraction(
"get_public_room_ids_at_stream_id",
- self.get_public_room_ids_at_stream_id_txn, stream_id
+ self.get_public_room_ids_at_stream_id_txn,
+ stream_id, network_tuple=network_tuple
)
- def get_public_room_ids_at_stream_id_txn(self, txn, stream_id):
+ def get_public_room_ids_at_stream_id_txn(self, txn, stream_id,
+ network_tuple):
return {
rm
- for rm, vis in self.get_published_at_stream_id_txn(txn, stream_id).items()
+ for rm, vis in self.get_published_at_stream_id_txn(
+ txn, stream_id, network_tuple=network_tuple
+ ).items()
if vis
}
- def get_published_at_stream_id_txn(self, txn, stream_id):
- sql = ("""
- SELECT room_id, visibility FROM public_room_list_stream
- INNER JOIN (
- SELECT room_id, max(stream_id) AS stream_id
+ def get_published_at_stream_id_txn(self, txn, stream_id, network_tuple):
+ if network_tuple:
+ # We want to get from a particular list. No aggregation required.
+
+ sql = ("""
+ SELECT room_id, visibility FROM public_room_list_stream
+ INNER JOIN (
+ SELECT room_id, max(stream_id) AS stream_id
+ FROM public_room_list_stream
+ WHERE stream_id <= ? %s
+ GROUP BY room_id
+ ) grouped USING (room_id, stream_id)
+ """)
+
+ if network_tuple.appservice_id is not None:
+ txn.execute(
+ sql % ("AND appservice_id = ? AND network_id = ?",),
+ (stream_id, network_tuple.appservice_id, network_tuple.network_id,)
+ )
+ else:
+ txn.execute(
+ sql % ("AND appservice_id IS NULL",),
+ (stream_id,)
+ )
+ return dict(txn.fetchall())
+ else:
+ # We want to get from all lists, so we need to aggregate the results
+
+ logger.info("Executing full list")
+
+ sql = ("""
+ SELECT room_id, visibility
FROM public_room_list_stream
- WHERE stream_id <= ?
- GROUP BY room_id
- ) grouped USING (room_id, stream_id)
- """)
+ INNER JOIN (
+ SELECT
+ room_id, max(stream_id) AS stream_id, appservice_id,
+ network_id
+ FROM public_room_list_stream
+ WHERE stream_id <= ?
+ GROUP BY room_id, appservice_id, network_id
+ ) grouped USING (room_id, stream_id)
+ """)
- txn.execute(sql, (stream_id,))
- return dict(txn.fetchall())
+ txn.execute(
+ sql,
+ (stream_id,)
+ )
+
+ results = {}
+ # A room is visible if its visible on any list.
+ for room_id, visibility in txn.fetchall():
+ results[room_id] = bool(visibility) or results.get(room_id, False)
- def get_public_room_changes(self, prev_stream_id, new_stream_id):
+ return results
+
+ def get_public_room_changes(self, prev_stream_id, new_stream_id,
+ network_tuple):
def get_public_room_changes_txn(txn):
- then_rooms = self.get_public_room_ids_at_stream_id_txn(txn, prev_stream_id)
+ then_rooms = self.get_public_room_ids_at_stream_id_txn(
+ txn, prev_stream_id, network_tuple
+ )
- now_rooms_dict = self.get_published_at_stream_id_txn(txn, new_stream_id)
+ now_rooms_dict = self.get_published_at_stream_id_txn(
+ txn, new_stream_id, network_tuple
+ )
now_rooms_visible = set(
rm for rm, vis in now_rooms_dict.items() if vis
@@ -311,7 +457,8 @@ class RoomStore(SQLBaseStore):
def get_all_new_public_rooms(self, prev_id, current_id, limit):
def get_all_new_public_rooms(txn):
sql = ("""
- SELECT stream_id, room_id, visibility FROM public_room_list_stream
+ SELECT stream_id, room_id, visibility, appservice_id, network_id
+ FROM public_room_list_stream
WHERE stream_id > ? AND stream_id <= ?
ORDER BY stream_id ASC
LIMIT ?
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 866d64e679..946d5a81cc 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -24,6 +24,7 @@ from synapse.api.constants import Membership, EventTypes
from synapse.types import get_domain_from_id
import logging
+import ujson as json
logger = logging.getLogger(__name__)
@@ -34,7 +35,15 @@ RoomsForUser = namedtuple(
)
+_MEMBERSHIP_PROFILE_UPDATE_NAME = "room_membership_profile_update"
+
+
class RoomMemberStore(SQLBaseStore):
+ def __init__(self, hs):
+ super(RoomMemberStore, self).__init__(hs)
+ self.register_background_update_handler(
+ _MEMBERSHIP_PROFILE_UPDATE_NAME, self._background_add_membership_profile
+ )
def _store_room_members_txn(self, txn, events, backfilled):
"""Store a room member in the database.
@@ -49,6 +58,8 @@ class RoomMemberStore(SQLBaseStore):
"sender": event.user_id,
"room_id": event.room_id,
"membership": event.membership,
+ "display_name": event.content.get("displayname", None),
+ "avatar_url": event.content.get("avatar_url", None),
}
for event in events
]
@@ -398,7 +409,7 @@ class RoomMemberStore(SQLBaseStore):
table="room_memberships",
column="event_id",
iterable=member_event_ids,
- retcols=['user_id'],
+ retcols=['user_id', 'display_name', 'avatar_url'],
keyvalues={
"membership": Membership.JOIN,
},
@@ -406,11 +417,21 @@ class RoomMemberStore(SQLBaseStore):
desc="_get_joined_users_from_context",
)
- users_in_room = set(row["user_id"] for row in rows)
+ users_in_room = {
+ row["user_id"]: {
+ "display_name": row["display_name"],
+ "avatar_url": row["avatar_url"],
+ }
+ for row in rows
+ }
+
if event is not None and event.type == EventTypes.Member:
if event.membership == Membership.JOIN:
if event.event_id in member_event_ids:
- users_in_room.add(event.state_key)
+ users_in_room[event.state_key] = {
+ "display_name": event.content.get("displayname", None),
+ "avatar_url": event.content.get("avatar_url", None),
+ }
defer.returnValue(users_in_room)
@@ -448,3 +469,78 @@ class RoomMemberStore(SQLBaseStore):
defer.returnValue(True)
defer.returnValue(False)
+
+ @defer.inlineCallbacks
+ def _background_add_membership_profile(self, progress, batch_size):
+ target_min_stream_id = progress.get(
+ "target_min_stream_id_inclusive", self._min_stream_order_on_start
+ )
+ max_stream_id = progress.get(
+ "max_stream_id_exclusive", self._stream_order_on_start + 1
+ )
+
+ INSERT_CLUMP_SIZE = 1000
+
+ def add_membership_profile_txn(txn):
+ sql = ("""
+ SELECT stream_ordering, event_id, events.room_id, content
+ FROM events
+ INNER JOIN room_memberships USING (event_id)
+ WHERE ? <= stream_ordering AND stream_ordering < ?
+ AND type = 'm.room.member'
+ ORDER BY stream_ordering DESC
+ LIMIT ?
+ """)
+
+ txn.execute(sql, (target_min_stream_id, max_stream_id, batch_size))
+
+ rows = self.cursor_to_dict(txn)
+ if not rows:
+ return 0
+
+ min_stream_id = rows[-1]["stream_ordering"]
+
+ to_update = []
+ for row in rows:
+ event_id = row["event_id"]
+ room_id = row["room_id"]
+ try:
+ content = json.loads(row["content"])
+ except:
+ continue
+
+ display_name = content.get("displayname", None)
+ avatar_url = content.get("avatar_url", None)
+
+ if display_name or avatar_url:
+ to_update.append((
+ display_name, avatar_url, event_id, room_id
+ ))
+
+ to_update_sql = ("""
+ UPDATE room_memberships SET display_name = ?, avatar_url = ?
+ WHERE event_id = ? AND room_id = ?
+ """)
+ for index in range(0, len(to_update), INSERT_CLUMP_SIZE):
+ clump = to_update[index:index + INSERT_CLUMP_SIZE]
+ txn.executemany(to_update_sql, clump)
+
+ progress = {
+ "target_min_stream_id_inclusive": target_min_stream_id,
+ "max_stream_id_exclusive": min_stream_id,
+ }
+
+ self._background_update_progress_txn(
+ txn, _MEMBERSHIP_PROFILE_UPDATE_NAME, progress
+ )
+
+ return len(rows)
+
+ result = yield self.runInteraction(
+ _MEMBERSHIP_PROFILE_UPDATE_NAME, add_membership_profile_txn
+ )
+
+ if not result:
+ yield self._end_background_update(_MEMBERSHIP_PROFILE_UPDATE_NAME)
+
+ defer.returnValue(result)
diff --git a/synapse/storage/schema/delta/39/appservice_room_list.sql b/synapse/storage/schema/delta/39/appservice_room_list.sql
new file mode 100644
index 0000000000..74bdc49073
--- /dev/null
+++ b/synapse/storage/schema/delta/39/appservice_room_list.sql
@@ -0,0 +1,29 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE TABLE appservice_room_list(
+ appservice_id TEXT NOT NULL,
+ network_id TEXT NOT NULL,
+ room_id TEXT NOT NULL
+);
+
+-- Each appservice can have multiple published room lists associated with them,
+-- keyed of a particular network_id
+CREATE UNIQUE INDEX appservice_room_list_idx ON appservice_room_list(
+ appservice_id, network_id, room_id
+);
+
+ALTER TABLE public_room_list_stream ADD COLUMN appservice_id TEXT;
+ALTER TABLE public_room_list_stream ADD COLUMN network_id TEXT;
diff --git a/synapse/storage/schema/delta/39/device_federation_stream_idx.sql b/synapse/storage/schema/delta/39/device_federation_stream_idx.sql
new file mode 100644
index 0000000000..00be801e90
--- /dev/null
+++ b/synapse/storage/schema/delta/39/device_federation_stream_idx.sql
@@ -0,0 +1,16 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE INDEX device_federation_outbox_id ON device_federation_outbox(stream_id);
diff --git a/synapse/storage/schema/delta/39/event_push_index.sql b/synapse/storage/schema/delta/39/event_push_index.sql
new file mode 100644
index 0000000000..de2ad93e5c
--- /dev/null
+++ b/synapse/storage/schema/delta/39/event_push_index.sql
@@ -0,0 +1,17 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+INSERT INTO background_updates (update_name, progress_json) VALUES
+ ('event_push_actions_highlights_index', '{}');
diff --git a/synapse/storage/schema/delta/39/federation_out_position.sql b/synapse/storage/schema/delta/39/federation_out_position.sql
new file mode 100644
index 0000000000..5af814290b
--- /dev/null
+++ b/synapse/storage/schema/delta/39/federation_out_position.sql
@@ -0,0 +1,22 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ CREATE TABLE federation_stream_position(
+ type TEXT NOT NULL,
+ stream_id INTEGER NOT NULL
+ );
+
+ INSERT INTO federation_stream_position (type, stream_id) VALUES ('federation', -1);
+ INSERT INTO federation_stream_position (type, stream_id) SELECT 'events', coalesce(max(stream_ordering), -1) FROM events;
diff --git a/synapse/storage/schema/delta/39/membership_profile.sql b/synapse/storage/schema/delta/39/membership_profile.sql
new file mode 100644
index 0000000000..1bf911c8ab
--- /dev/null
+++ b/synapse/storage/schema/delta/39/membership_profile.sql
@@ -0,0 +1,20 @@
+/* Copyright 2016 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ALTER TABLE room_memberships ADD COLUMN display_name TEXT;
+ALTER TABLE room_memberships ADD COLUMN avatar_url TEXT;
+
+INSERT into background_updates (update_name, progress_json)
+ VALUES ('room_membership_profile_update', '{}');
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 49abf0ac74..23e7ad9922 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -653,7 +653,10 @@ class StateStore(SQLBaseStore):
else:
state_dict = results[group]
- state_dict.update(group_state_dict)
+ state_dict.update({
+ (intern_string(k[0]), intern_string(k[1])): v
+ for k, v in group_state_dict.items()
+ })
self._state_group_cache.update(
cache_seq_num,
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 888b1cb35d..2dc24951c4 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -541,6 +541,9 @@ class StreamStore(SQLBaseStore):
def get_room_max_stream_ordering(self):
return self._stream_id_gen.get_current_token()
+ def get_room_min_stream_ordering(self):
+ return self._backfill_id_gen.get_current_token()
+
def get_stream_token_for_event(self, event_id):
"""The stream token for an event
Args:
@@ -765,3 +768,50 @@ class StreamStore(SQLBaseStore):
"token": end_token,
},
}
+
+ @defer.inlineCallbacks
+ def get_all_new_events_stream(self, from_id, current_id, limit):
+ """Get all new events"""
+
+ def get_all_new_events_stream_txn(txn):
+ sql = (
+ "SELECT e.stream_ordering, e.event_id"
+ " FROM events AS e"
+ " WHERE"
+ " ? < e.stream_ordering AND e.stream_ordering <= ?"
+ " ORDER BY e.stream_ordering ASC"
+ " LIMIT ?"
+ )
+
+ txn.execute(sql, (from_id, current_id, limit))
+ rows = txn.fetchall()
+
+ upper_bound = current_id
+ if len(rows) == limit:
+ upper_bound = rows[-1][0]
+
+ return upper_bound, [row[1] for row in rows]
+
+ upper_bound, event_ids = yield self.runInteraction(
+ "get_all_new_events_stream", get_all_new_events_stream_txn,
+ )
+
+ events = yield self._get_events(event_ids)
+
+ defer.returnValue((upper_bound, events))
+
+ def get_federation_out_pos(self, typ):
+ return self._simple_select_one_onecol(
+ table="federation_stream_position",
+ retcol="stream_id",
+ keyvalues={"type": typ},
+ desc="get_federation_out_pos"
+ )
+
+ def update_federation_out_pos(self, typ, stream_id):
+ return self._simple_update_one(
+ table="federation_stream_position",
+ keyvalues={"type": typ},
+ updatevalues={"stream_id": stream_id},
+ desc="update_federation_out_pos",
+ )
diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py
index adab520c78..809fdd311f 100644
--- a/synapse/storage/transactions.py
+++ b/synapse/storage/transactions.py
@@ -200,25 +200,48 @@ class TransactionStore(SQLBaseStore):
def _set_destination_retry_timings(self, txn, destination,
retry_last_ts, retry_interval):
- txn.call_after(self.get_destination_retry_timings.invalidate, (destination,))
+ self.database_engine.lock_table(txn, "destinations")
- self._simple_upsert_txn(
+ self._invalidate_cache_and_stream(
+ txn, self.get_destination_retry_timings, (destination,)
+ )
+
+ # We need to be careful here as the data may have changed from under us
+ # due to a worker setting the timings.
+
+ prev_row = self._simple_select_one_txn(
txn,
- "destinations",
+ table="destinations",
keyvalues={
"destination": destination,
},
- values={
- "retry_last_ts": retry_last_ts,
- "retry_interval": retry_interval,
- },
- insertion_values={
- "destination": destination,
- "retry_last_ts": retry_last_ts,
- "retry_interval": retry_interval,
- }
+ retcols=("retry_last_ts", "retry_interval"),
+ allow_none=True,
)
+ if not prev_row:
+ self._simple_insert_txn(
+ txn,
+ table="destinations",
+ values={
+ "destination": destination,
+ "retry_last_ts": retry_last_ts,
+ "retry_interval": retry_interval,
+ }
+ )
+ elif retry_interval == 0 or prev_row["retry_interval"] < retry_interval:
+ self._simple_update_one_txn(
+ txn,
+ "destinations",
+ keyvalues={
+ "destination": destination,
+ },
+ updatevalues={
+ "retry_last_ts": retry_last_ts,
+ "retry_interval": retry_interval,
+ },
+ )
+
def get_destinations_needing_retry(self):
"""Get all destinations which are due a retry for sending a transaction.
|