diff --git a/jenkins-dendron-haproxy-postgres.sh b/jenkins-dendron-haproxy-postgres.sh
new file mode 100755
index 0000000000..d64b2d2c9d
--- /dev/null
+++ b/jenkins-dendron-haproxy-postgres.sh
@@ -0,0 +1,22 @@
+#!/bin/bash
+
+set -eux
+
+: ${WORKSPACE:="$(pwd)"}
+
+export WORKSPACE
+export PYTHONDONTWRITEBYTECODE=yep
+export SYNAPSE_CACHE_FACTOR=1
+
+export HAPROXY_BIN=/home/haproxy/haproxy-1.6.11/haproxy
+
+./jenkins/prepare_synapse.sh
+./jenkins/clone.sh sytest https://github.com/matrix-org/sytest.git
+./jenkins/clone.sh dendron https://github.com/matrix-org/dendron.git
+./dendron/jenkins/build_dendron.sh
+./sytest/jenkins/prep_sytest_for_postgres.sh
+
+./sytest/jenkins/install_and_run.sh \
+ --synapse-directory $WORKSPACE \
+ --dendron $WORKSPACE/dendron/bin/dendron \
+ --haproxy \
diff --git a/jenkins-dendron-postgres.sh b/jenkins-dendron-postgres.sh
index 55ff31fd18..37ae746f4b 100755
--- a/jenkins-dendron-postgres.sh
+++ b/jenkins-dendron-postgres.sh
@@ -17,9 +17,3 @@ export SYNAPSE_CACHE_FACTOR=1
./sytest/jenkins/install_and_run.sh \
--synapse-directory $WORKSPACE \
--dendron $WORKSPACE/dendron/bin/dendron \
- --pusher \
- --synchrotron \
- --federation-reader \
- --client-reader \
- --appservice \
- --federation-sender \
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 419bfef562..e859b3165f 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -16,6 +16,7 @@ from synapse.api import errors
from synapse.api.constants import EventTypes
from synapse.util import stringutils
from synapse.util.async import Linearizer
+from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.metrics import measure_func
from synapse.types import get_domain_from_id, RoomStreamToken
from twisted.internet import defer
@@ -34,10 +35,11 @@ class DeviceHandler(BaseHandler):
self.state = hs.get_state_handler()
self.federation_sender = hs.get_federation_sender()
self.federation = hs.get_replication_layer()
- self._remote_edue_linearizer = Linearizer(name="remote_device_list")
+
+ self._edu_updater = DeviceListEduUpdater(hs, self)
self.federation.register_edu_handler(
- "m.device_list_update", self._incoming_device_list_update,
+ "m.device_list_update", self._edu_updater.incoming_device_list_update,
)
self.federation.register_query_handler(
"user_devices", self.on_federation_query_user_devices,
@@ -299,13 +301,69 @@ class DeviceHandler(BaseHandler):
# and those that actually still share a room with the user
defer.returnValue(users_who_share_room & possibly_changed)
- @measure_func("_incoming_device_list_update")
@defer.inlineCallbacks
- def _incoming_device_list_update(self, origin, edu_content):
- user_id = edu_content["user_id"]
- device_id = edu_content["device_id"]
- stream_id = edu_content["stream_id"]
- prev_ids = edu_content.get("prev_id", [])
+ def on_federation_query_user_devices(self, user_id):
+ stream_id, devices = yield self.store.get_devices_with_keys_by_user(user_id)
+ defer.returnValue({
+ "user_id": user_id,
+ "stream_id": stream_id,
+ "devices": devices,
+ })
+
+ @defer.inlineCallbacks
+ def user_left_room(self, user, room_id):
+ user_id = user.to_string()
+ rooms = yield self.store.get_rooms_for_user(user_id)
+ if not rooms:
+ # We no longer share rooms with this user, so we'll no longer
+ # receive device updates. Mark this in DB.
+ yield self.store.mark_remote_user_device_list_as_unsubscribed(user_id)
+
+
+def _update_device_from_client_ips(device, client_ips):
+ ip = client_ips.get((device["user_id"], device["device_id"]), {})
+ device.update({
+ "last_seen_ts": ip.get("last_seen"),
+ "last_seen_ip": ip.get("ip"),
+ })
+
+
+class DeviceListEduUpdater(object):
+ "Handles incoming device list updates from federation and updates the DB"
+
+ def __init__(self, hs, device_handler):
+ self.store = hs.get_datastore()
+ self.federation = hs.get_replication_layer()
+ self.clock = hs.get_clock()
+ self.device_handler = device_handler
+
+ self._remote_edu_linearizer = Linearizer(name="remote_device_list")
+
+ # user_id -> list of updates waiting to be handled.
+ self._pending_updates = {}
+
+ # Recently seen stream ids. We don't bother keeping these in the DB,
+ # but they're useful to have them about to reduce the number of spurious
+ # resyncs.
+ self._seen_updates = ExpiringCache(
+ cache_name="device_update_edu",
+ clock=self.clock,
+ max_len=10000,
+ expiry_ms=30 * 60 * 1000,
+ iterable=True,
+ )
+
+ @defer.inlineCallbacks
+ def incoming_device_list_update(self, origin, edu_content):
+ """Called on incoming device list update from federation. Responsible
+ for parsing the EDU and adding to pending updates list.
+ """
+
+ user_id = edu_content.pop("user_id")
+ device_id = edu_content.pop("device_id")
+ stream_id = str(edu_content.pop("stream_id")) # They may come as ints
+ prev_ids = edu_content.pop("prev_id", [])
+ prev_ids = [str(p) for p in prev_ids] # They may come as ints
if get_domain_from_id(user_id) != origin:
# TODO: Raise?
@@ -318,20 +376,28 @@ class DeviceHandler(BaseHandler):
# probably won't get any further updates.
return
- with (yield self._remote_edue_linearizer.queue(user_id)):
- # If the prev id matches whats in our cache table, then we don't need
- # to resync the users device list, otherwise we do.
- resync = True
- if len(prev_ids) == 1:
- extremity = yield self.store.get_device_list_last_stream_id_for_remote(
- user_id
- )
- logger.info("Extrem: %r, prev_ids: %r", extremity, prev_ids)
- if extremity and prev_ids[0] and int(extremity) >= int(prev_ids[0]):
- resync = False
+ self._pending_updates.setdefault(user_id, []).append(
+ (device_id, stream_id, prev_ids, edu_content)
+ )
+
+ yield self._handle_device_updates(user_id)
+
+ @measure_func("_incoming_device_list_update")
+ @defer.inlineCallbacks
+ def _handle_device_updates(self, user_id):
+ "Actually handle pending updates."
+
+ with (yield self._remote_edu_linearizer.queue(user_id)):
+ pending_updates = self._pending_updates.pop(user_id, [])
+ if not pending_updates:
+ # This can happen since we batch updates
+ return
+
+ resync = yield self._need_to_do_resync(user_id, pending_updates)
if resync:
# Fetch all devices for the user.
+ origin = get_domain_from_id(user_id)
result = yield self.federation.query_user_devices(origin, user_id)
stream_id = result["stream_id"]
devices = result["devices"]
@@ -339,40 +405,50 @@ class DeviceHandler(BaseHandler):
user_id, devices, stream_id,
)
device_ids = [device["device_id"] for device in devices]
- yield self.notify_device_update(user_id, device_ids)
+ yield self.device_handler.notify_device_update(user_id, device_ids)
else:
# Simply update the single device, since we know that is the only
# change (becuase of the single prev_id matching the current cache)
- content = dict(edu_content)
- for key in ("user_id", "device_id", "stream_id", "prev_ids"):
- content.pop(key, None)
- yield self.store.update_remote_device_list_cache_entry(
- user_id, device_id, content, stream_id,
+ for device_id, stream_id, prev_ids, content in pending_updates:
+ yield self.store.update_remote_device_list_cache_entry(
+ user_id, device_id, content, stream_id,
+ )
+
+ yield self.device_handler.notify_device_update(
+ user_id, [device_id for device_id, _, _, _ in pending_updates]
)
- yield self.notify_device_update(user_id, [device_id])
- @defer.inlineCallbacks
- def on_federation_query_user_devices(self, user_id):
- stream_id, devices = yield self.store.get_devices_with_keys_by_user(user_id)
- defer.returnValue({
- "user_id": user_id,
- "stream_id": stream_id,
- "devices": devices,
- })
+ self._seen_updates.setdefault(user_id, set()).update(
+ stream_id for _, stream_id, _, _ in pending_updates
+ )
@defer.inlineCallbacks
- def user_left_room(self, user, room_id):
- user_id = user.to_string()
- rooms = yield self.store.get_rooms_for_user(user_id)
- if not rooms:
- # We no longer share rooms with this user, so we'll no longer
- # receive device updates. Mark this in DB.
- yield self.store.mark_remote_user_device_list_as_unsubscribed(user_id)
+ def _need_to_do_resync(self, user_id, updates):
+ """Given a list of updates for a user figure out if we need to do a full
+ resync, or whether we have enough data that we can just apply the delta.
+ """
+ seen_updates = self._seen_updates.get(user_id, set())
+ extremity = yield self.store.get_device_list_last_stream_id_for_remote(
+ user_id
+ )
-def _update_device_from_client_ips(device, client_ips):
- ip = client_ips.get((device["user_id"], device["device_id"]), {})
- device.update({
- "last_seen_ts": ip.get("last_seen"),
- "last_seen_ip": ip.get("ip"),
- })
+ stream_id_in_updates = set() # stream_ids in updates list
+ for _, stream_id, prev_ids, _ in updates:
+ if not prev_ids:
+ # We always do a resync if there are no previous IDs
+ defer.returnValue(True)
+
+ for prev_id in prev_ids:
+ if prev_id == extremity:
+ continue
+ elif prev_id in seen_updates:
+ continue
+ elif prev_id in stream_id_in_updates:
+ continue
+ else:
+ defer.returnValue(True)
+
+ stream_id_in_updates.add(stream_id)
+
+ defer.returnValue(False)
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 4410cd9e62..a7a8ec9b7b 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -488,10 +488,6 @@ class SQLBaseStore(object):
" AND ".join("%s = ?" % (k,) for k in keyvalues)
)
sqlargs = values.values() + keyvalues.values()
- logger.debug(
- "[SQL] %s Args=%s",
- sql, sqlargs,
- )
txn.execute(sql, sqlargs)
if txn.rowcount == 0:
@@ -506,10 +502,6 @@ class SQLBaseStore(object):
", ".join(k for k in allvalues),
", ".join("?" for _ in allvalues)
)
- logger.debug(
- "[SQL] %s Args=%s",
- sql, keyvalues.values(),
- )
txn.execute(sql, allvalues.values())
return True
diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
index 7b5903bf8e..bd56ba2515 100644
--- a/synapse/storage/devices.py
+++ b/synapse/storage/devices.py
@@ -33,6 +33,13 @@ class DeviceStore(SQLBaseStore):
self._prune_old_outbound_device_pokes, 60 * 60 * 1000
)
+ self.register_background_index_update(
+ "device_lists_stream_idx",
+ index_name="device_lists_stream_user_id",
+ table="device_lists_stream",
+ columns=["user_id", "device_id"],
+ )
+
@defer.inlineCallbacks
def store_device(self, user_id, device_id,
initial_device_display_name):
@@ -501,7 +508,7 @@ class DeviceStore(SQLBaseStore):
defer.returnValue(set(changed))
sql = """
- SELECT user_id FROM device_lists_stream WHERE stream_id > ?
+ SELECT DISTINCT user_id FROM device_lists_stream WHERE stream_id > ?
"""
rows = yield self._execute("get_user_whose_devices_changed", None, sql, from_key)
defer.returnValue(set(row[0] for row in rows))
@@ -546,6 +553,16 @@ class DeviceStore(SQLBaseStore):
host, stream_id,
)
+ # Delete older entries in the table, as we really only care about
+ # when the latest change happened.
+ txn.executemany(
+ """
+ DELETE FROM device_lists_stream
+ WHERE user_id = ? AND device_id = ? AND stream_id < ?
+ """,
+ [(user_id, device_id, stream_id) for device_id in device_ids]
+ )
+
self._simple_insert_many_txn(
txn,
table="device_lists_stream",
diff --git a/synapse/storage/schema/delta/41/device_list_stream_idx.sql b/synapse/storage/schema/delta/41/device_list_stream_idx.sql
new file mode 100644
index 0000000000..b7bee8b692
--- /dev/null
+++ b/synapse/storage/schema/delta/41/device_list_stream_idx.sql
@@ -0,0 +1,17 @@
+/* Copyright 2017 Vector Creations Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+INSERT into background_updates (update_name, progress_json)
+ VALUES ('device_lists_stream_idx', '{}');
|