diff --git a/scripts/register_new_matrix_user b/scripts/register_new_matrix_user
index 0ca83795a3..4a520bdb5d 100755
--- a/scripts/register_new_matrix_user
+++ b/scripts/register_new_matrix_user
@@ -33,9 +33,10 @@ def request_registration(user, password, server_location, shared_secret):
).hexdigest()
data = {
- "username": user,
+ "user": user,
"password": password,
"mac": mac,
+ "type": "org.matrix.login.shared_secret",
}
server_location = server_location.rstrip("/")
@@ -43,7 +44,7 @@ def request_registration(user, password, server_location, shared_secret):
print "Sending registration request..."
req = urllib2.Request(
- "%s/_matrix/client/v2_alpha/register" % (server_location,),
+ "%s/_matrix/client/api/v1/register" % (server_location,),
data=json.dumps(data),
headers={'Content-Type': 'application/json'}
)
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index fa43211415..f3513abb55 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -277,9 +277,12 @@ class SynapseHomeServer(HomeServer):
config,
metrics_resource,
),
- interface="127.0.0.1",
+ interface=config.metrics_bind_host,
+ )
+ logger.info(
+ "Metrics now running on %s port %d",
+ config.metrics_bind_host, config.metrics_port,
)
- logger.info("Metrics now running on 127.0.0.1 port %d", config.metrics_port)
def run_startup_checks(self, db_conn, database_engine):
all_users_native = are_all_users_on_domain(
diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py
index 63a18b802b..e3ca45de83 100644
--- a/synapse/appservice/__init__.py
+++ b/synapse/appservice/__init__.py
@@ -148,8 +148,8 @@ class ApplicationService(object):
and self.is_interested_in_user(event.state_key)):
return True
# check joined member events
- for member in member_list:
- if self.is_interested_in_user(member.state_key):
+ for user_id in member_list:
+ if self.is_interested_in_user(user_id):
return True
return False
@@ -173,7 +173,7 @@ class ApplicationService(object):
restrict_to(str): The namespace to restrict regex tests to.
aliases_for_event(list): A list of all the known room aliases for
this event.
- member_list(list): A list of all joined room members in this room.
+ member_list(list): A list of all joined user_ids in this room.
Returns:
bool: True if this service would like to know about this event.
"""
diff --git a/synapse/config/metrics.py b/synapse/config/metrics.py
index 71a1b1d189..0cfb30ce7f 100644
--- a/synapse/config/metrics.py
+++ b/synapse/config/metrics.py
@@ -20,6 +20,7 @@ class MetricsConfig(Config):
def read_config(self, config):
self.enable_metrics = config["enable_metrics"]
self.metrics_port = config.get("metrics_port")
+ self.metrics_bind_host = config.get("metrics_bind_host", "127.0.0.1")
def default_config(self, config_dir_path, server_name):
return """\
@@ -28,6 +29,9 @@ class MetricsConfig(Config):
# Enable collection and rendering of performance metrics
enable_metrics: False
- # Separate port to accept metrics requests on (on localhost)
+ # Separate port to accept metrics requests on
# metrics_port: 8081
+
+ # Which host to bind the metric listener to
+ # metrics_bind_host: 127.0.0.1
"""
diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py
index 5217d91aab..f0430b2cb1 100644
--- a/synapse/federation/federation_base.py
+++ b/synapse/federation/federation_base.py
@@ -80,6 +80,7 @@ class FederationBase(object):
destinations=[pdu.origin],
event_id=pdu.event_id,
outlier=outlier,
+ timeout=10000,
)
if new_pdu:
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 3a7bc0c9a7..d3b46b24c1 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -22,6 +22,7 @@ from .units import Edu
from synapse.api.errors import (
CodeMessageException, HttpResponseException, SynapseError,
)
+from synapse.util import unwrapFirstError
from synapse.util.expiringcache import ExpiringCache
from synapse.util.logutils import log_function
from synapse.events import FrozenEvent
@@ -164,16 +165,17 @@ class FederationClient(FederationBase):
for p in transaction_data["pdus"]
]
- for i, pdu in enumerate(pdus):
- pdus[i] = yield self._check_sigs_and_hash(pdu)
-
- # FIXME: We should handle signature failures more gracefully.
+ # FIXME: We should handle signature failures more gracefully.
+ pdus[:] = yield defer.gatherResults(
+ [self._check_sigs_and_hash(pdu) for pdu in pdus],
+ consumeErrors=True,
+ ).addErrback(unwrapFirstError)
defer.returnValue(pdus)
@defer.inlineCallbacks
@log_function
- def get_pdu(self, destinations, event_id, outlier=False):
+ def get_pdu(self, destinations, event_id, outlier=False, timeout=None):
"""Requests the PDU with given origin and ID from the remote home
servers.
@@ -189,6 +191,8 @@ class FederationClient(FederationBase):
outlier (bool): Indicates whether the PDU is an `outlier`, i.e. if
it's from an arbitary point in the context as opposed to part
of the current block of PDUs. Defaults to `False`
+ timeout (int): How long to try (in ms) each destination for before
+ moving to the next destination. None indicates no timeout.
Returns:
Deferred: Results in the requested PDU.
@@ -212,7 +216,7 @@ class FederationClient(FederationBase):
with limiter:
transaction_data = yield self.transport_layer.get_event(
- destination, event_id
+ destination, event_id, timeout=timeout,
)
logger.debug("transaction_data %r", transaction_data)
@@ -370,13 +374,17 @@ class FederationClient(FederationBase):
for p in content.get("auth_chain", [])
]
- signed_state = yield self._check_sigs_and_hash_and_fetch(
- destination, state, outlier=True
- )
-
- signed_auth = yield self._check_sigs_and_hash_and_fetch(
- destination, auth_chain, outlier=True
- )
+ signed_state, signed_auth = yield defer.gatherResults(
+ [
+ self._check_sigs_and_hash_and_fetch(
+ destination, state, outlier=True
+ ),
+ self._check_sigs_and_hash_and_fetch(
+ destination, auth_chain, outlier=True
+ )
+ ],
+ consumeErrors=True
+ ).addErrback(unwrapFirstError)
auth_chain.sort(key=lambda e: e.depth)
@@ -518,7 +526,7 @@ class FederationClient(FederationBase):
# Are we missing any?
seen_events = set(earliest_events_ids)
- seen_events.update(e.event_id for e in signed_events)
+ seen_events.update(e.event_id for e in signed_events if e)
missing_events = {}
for e in itertools.chain(latest_events, signed_events):
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index ca04822fb3..32fa5e8c15 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -207,13 +207,13 @@ class TransactionQueue(object):
# request at which point pending_pdus_by_dest just keeps growing.
# we need application-layer timeouts of some flavour of these
# requests
- logger.info(
+ logger.debug(
"TX [%s] Transaction already in progress",
destination
)
return
- logger.info("TX [%s] _attempt_new_transaction", destination)
+ logger.debug("TX [%s] _attempt_new_transaction", destination)
# list of (pending_pdu, deferred, order)
pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
@@ -221,11 +221,11 @@ class TransactionQueue(object):
pending_failures = self.pending_failures_by_dest.pop(destination, [])
if pending_pdus:
- logger.info("TX [%s] len(pending_pdus_by_dest[dest]) = %d",
- destination, len(pending_pdus))
+ logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d",
+ destination, len(pending_pdus))
if not pending_pdus and not pending_edus and not pending_failures:
- logger.info("TX [%s] Nothing to send", destination)
+ logger.debug("TX [%s] Nothing to send", destination)
return
# Sort based on the order field
@@ -242,6 +242,8 @@ class TransactionQueue(object):
try:
self.pending_transactions[destination] = 1
+ txn_id = str(self._next_txn_id)
+
limiter = yield get_retry_limiter(
destination,
self._clock,
@@ -249,9 +251,9 @@ class TransactionQueue(object):
)
logger.debug(
- "TX [%s] Attempting new transaction"
+ "TX [%s] {%s} Attempting new transaction"
" (pdus: %d, edus: %d, failures: %d)",
- destination,
+ destination, txn_id,
len(pending_pdus),
len(pending_edus),
len(pending_failures)
@@ -261,7 +263,7 @@ class TransactionQueue(object):
transaction = Transaction.create_new(
origin_server_ts=int(self._clock.time_msec()),
- transaction_id=str(self._next_txn_id),
+ transaction_id=txn_id,
origin=self.server_name,
destination=destination,
pdus=pdus,
@@ -275,9 +277,13 @@ class TransactionQueue(object):
logger.debug("TX [%s] Persisted transaction", destination)
logger.info(
- "TX [%s] Sending transaction [%s]",
- destination,
+ "TX [%s] {%s} Sending transaction [%s],"
+ " (PDUs: %d, EDUs: %d, failures: %d)",
+ destination, txn_id,
transaction.transaction_id,
+ len(pending_pdus),
+ len(pending_edus),
+ len(pending_failures),
)
with limiter:
@@ -313,7 +319,10 @@ class TransactionQueue(object):
code = e.code
response = e.response
- logger.info("TX [%s] got %d response", destination, code)
+ logger.info(
+ "TX [%s] {%s} got %d response",
+ destination, txn_id, code
+ )
logger.debug("TX [%s] Sent transaction", destination)
logger.debug("TX [%s] Marking as delivered...", destination)
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index 80d03012b7..610a4c3163 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -50,13 +50,15 @@ class TransportLayerClient(object):
)
@log_function
- def get_event(self, destination, event_id):
+ def get_event(self, destination, event_id, timeout=None):
""" Requests the pdu with give id and origin from the given server.
Args:
destination (str): The host name of the remote home server we want
to get the state from.
event_id (str): The id of the event being requested.
+ timeout (int): How long to try (in ms) the destination for before
+ giving up. None indicates no timeout.
Returns:
Deferred: Results in a dict received from the remote homeserver.
@@ -65,7 +67,7 @@ class TransportLayerClient(object):
destination, event_id)
path = PREFIX + "/event/%s/" % (event_id, )
- return self.client.get_json(destination, path=path)
+ return self.client.get_json(destination, path=path, timeout=timeout)
@log_function
def backfill(self, destination, room_id, event_tuples, limit):
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 2bfe0f3c9b..af87805f34 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -196,6 +196,14 @@ class FederationSendServlet(BaseFederationServlet):
transaction_id, str(transaction_data)
)
+ logger.info(
+ "Received txn %s from %s. (PDUs: %d, EDUs: %d, failures: %d)",
+ transaction_id, origin,
+ len(transaction_data.get("pdus", [])),
+ len(transaction_data.get("edus", [])),
+ len(transaction_data.get("failures", [])),
+ )
+
# We should ideally be getting this from the security layer.
# origin = body["origin"]
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 355ab317df..8269482e47 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -15,7 +15,7 @@
from twisted.internet import defer
-from synapse.api.constants import EventTypes, Membership
+from synapse.api.constants import EventTypes
from synapse.appservice import ApplicationService
from synapse.types import UserID
@@ -147,10 +147,7 @@ class ApplicationServicesHandler(object):
)
# We need to know the members associated with this event.room_id,
# if any.
- member_list = yield self.store.get_room_members(
- room_id=event.room_id,
- membership=Membership.JOIN
- )
+ member_list = yield self.store.get_users_in_room(event.room_id)
services = yield self.store.get_app_services()
interested_list = [
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index d35d9f603c..46ce3699d7 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -230,27 +230,65 @@ class FederationHandler(BaseHandler):
if not extremities:
extremities = yield self.store.get_oldest_events_in_room(room_id)
- pdus = yield self.replication_layer.backfill(
+ events = yield self.replication_layer.backfill(
dest,
room_id,
- limit,
+ limit=limit,
extremities=extremities,
)
- events = []
+ event_map = {e.event_id: e for e in events}
- for pdu in pdus:
- event = pdu
+ event_ids = set(e.event_id for e in events)
- # FIXME (erikj): Not sure this actually works :/
- context = yield self.state_handler.compute_event_context(event)
+ edges = [
+ ev.event_id
+ for ev in events
+ if set(e_id for e_id, _ in ev.prev_events) - event_ids
+ ]
- events.append((event, context))
+ # For each edge get the current state.
- yield self.store.persist_event(
- event,
- context=context,
- backfilled=True
+ auth_events = {}
+ events_to_state = {}
+ for e_id in edges:
+ state, auth = yield self.replication_layer.get_state_for_room(
+ destination=dest,
+ room_id=room_id,
+ event_id=e_id
+ )
+ auth_events.update({a.event_id: a for a in auth})
+ events_to_state[e_id] = state
+
+ yield defer.gatherResults(
+ [
+ self._handle_new_event(dest, a)
+ for a in auth_events.values()
+ ],
+ consumeErrors=True,
+ ).addErrback(unwrapFirstError)
+
+ yield defer.gatherResults(
+ [
+ self._handle_new_event(
+ dest, event_map[e_id],
+ state=events_to_state[e_id],
+ backfilled=True,
+ )
+ for e_id in events_to_state
+ ],
+ consumeErrors=True
+ ).addErrback(unwrapFirstError)
+
+ events.sort(key=lambda e: e.depth)
+
+ for event in events:
+ if event in events_to_state:
+ continue
+
+ yield self._handle_new_event(
+ dest, event,
+ backfilled=True,
)
defer.returnValue(events)
@@ -347,7 +385,7 @@ class FederationHandler(BaseHandler):
logger.info(e.message)
continue
except Exception as e:
- logger.warn(
+ logger.exception(
"Failed to backfill from %s because %s",
dom, e,
)
@@ -517,30 +555,14 @@ class FederationHandler(BaseHandler):
# FIXME
pass
- for e in auth_chain:
- e.internal_metadata.outlier = True
-
- if e.event_id == event.event_id:
- continue
-
- try:
- auth_ids = [e_id for e_id, _ in e.auth_events]
- auth = {
- (e.type, e.state_key): e for e in auth_chain
- if e.event_id in auth_ids
- }
- yield self._handle_new_event(
- origin, e, auth_events=auth
- )
- except:
- logger.exception(
- "Failed to handle auth event %s",
- e.event_id,
- )
+ yield self._handle_auth_events(
+ origin, [e for e in auth_chain if e.event_id != event.event_id]
+ )
- for e in state:
+ @defer.inlineCallbacks
+ def handle_state(e):
if e.event_id == event.event_id:
- continue
+ return
e.internal_metadata.outlier = True
try:
@@ -558,6 +580,8 @@ class FederationHandler(BaseHandler):
e.event_id,
)
+ yield defer.DeferredList([handle_state(e) for e in state])
+
auth_ids = [e_id for e_id, _ in event.auth_events]
auth_events = {
(e.type, e.state_key): e for e in auth_chain
@@ -893,9 +917,12 @@ class FederationHandler(BaseHandler):
# This is a hack to fix some old rooms where the initial join event
# didn't reference the create event in its auth events.
if event.type == EventTypes.Member and not event.auth_events:
- if len(event.prev_events) == 1:
- c = yield self.store.get_event(event.prev_events[0][0])
- if c.type == EventTypes.Create:
+ if len(event.prev_events) == 1 and event.depth < 5:
+ c = yield self.store.get_event(
+ event.prev_events[0][0],
+ allow_none=True,
+ )
+ if c and c.type == EventTypes.Create:
auth_events[(c.type, c.state_key)] = c
try:
@@ -1314,3 +1341,52 @@ class FederationHandler(BaseHandler):
},
"missing": [e.event_id for e in missing_locals],
})
+
+ @defer.inlineCallbacks
+ def _handle_auth_events(self, origin, auth_events):
+ auth_ids_to_deferred = {}
+
+ def process_auth_ev(ev):
+ auth_ids = [e_id for e_id, _ in ev.auth_events]
+
+ prev_ds = [
+ auth_ids_to_deferred[i]
+ for i in auth_ids
+ if i in auth_ids_to_deferred
+ ]
+
+ d = defer.Deferred()
+
+ auth_ids_to_deferred[ev.event_id] = d
+
+ @defer.inlineCallbacks
+ def f(*_):
+ ev.internal_metadata.outlier = True
+
+ try:
+ auth = {
+ (e.type, e.state_key): e for e in auth_events
+ if e.event_id in auth_ids
+ }
+
+ yield self._handle_new_event(
+ origin, ev, auth_events=auth
+ )
+ except:
+ logger.exception(
+ "Failed to handle auth event %s",
+ ev.event_id,
+ )
+
+ d.callback(None)
+
+ if prev_ds:
+ dx = defer.DeferredList(prev_ds)
+ dx.addBoth(f)
+ else:
+ f()
+
+ for e in auth_events:
+ process_auth_ev(e)
+
+ yield defer.DeferredList(auth_ids_to_deferred.values())
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 40794187b1..670c1d353f 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -146,6 +146,10 @@ class PresenceHandler(BaseHandler):
self._user_cachemap = {}
self._user_cachemap_latest_serial = 0
+ # map room_ids to the latest presence serial for a member of that
+ # room
+ self._room_serials = {}
+
metrics.register_callback(
"userCachemap:size",
lambda: len(self._user_cachemap),
@@ -297,13 +301,34 @@ class PresenceHandler(BaseHandler):
self.changed_presencelike_data(user, {"last_active": now})
+ def get_joined_rooms_for_user(self, user):
+ """Get the list of rooms a user is joined to.
+
+ Args:
+ user(UserID): The user.
+ Returns:
+ A Deferred of a list of room id strings.
+ """
+ rm_handler = self.homeserver.get_handlers().room_member_handler
+ return rm_handler.get_joined_rooms_for_user(user)
+
+ def get_joined_users_for_room_id(self, room_id):
+ rm_handler = self.homeserver.get_handlers().room_member_handler
+ return rm_handler.get_room_members(room_id)
+
+ @defer.inlineCallbacks
def changed_presencelike_data(self, user, state):
- statuscache = self._get_or_make_usercache(user)
+ """Updates the presence state of a local user.
+ Args:
+ user(UserID): The user being updated.
+ state(dict): The new presence state for the user.
+ Returns:
+ A Deferred
+ """
self._user_cachemap_latest_serial += 1
- statuscache.update(state, serial=self._user_cachemap_latest_serial)
-
- return self.push_presence(user, statuscache=statuscache)
+ statuscache = yield self.update_presence_cache(user, state)
+ yield self.push_presence(user, statuscache=statuscache)
@log_function
def started_user_eventstream(self, user):
@@ -326,13 +351,12 @@ class PresenceHandler(BaseHandler):
room_id(str): The room id the user joined.
"""
if self.hs.is_mine(user):
- statuscache = self._get_or_make_usercache(user)
-
# No actual update but we need to bump the serial anyway for the
# event source
self._user_cachemap_latest_serial += 1
- statuscache.update({}, serial=self._user_cachemap_latest_serial)
-
+ statuscache = yield self.update_presence_cache(
+ user, room_ids=[room_id]
+ )
self.push_update_to_local_and_remote(
observed_user=user,
room_ids=[room_id],
@@ -340,16 +364,17 @@ class PresenceHandler(BaseHandler):
)
# We also want to tell them about current presence of people.
- rm_handler = self.homeserver.get_handlers().room_member_handler
- curr_users = yield rm_handler.get_room_members(room_id)
+ curr_users = yield self.get_joined_users_for_room_id(room_id)
for local_user in [c for c in curr_users if self.hs.is_mine(c)]:
- statuscache = self._get_or_offline_usercache(local_user)
- statuscache.update({}, serial=self._user_cachemap_latest_serial)
+ statuscache = yield self.update_presence_cache(
+ local_user, room_ids=[room_id], add_to_cache=False
+ )
+
self.push_update_to_local_and_remote(
observed_user=local_user,
users_to_push=[user],
- statuscache=self._get_or_offline_usercache(local_user),
+ statuscache=statuscache,
)
@defer.inlineCallbacks
@@ -546,8 +571,7 @@ class PresenceHandler(BaseHandler):
# Also include people in all my rooms
- rm_handler = self.homeserver.get_handlers().room_member_handler
- room_ids = yield rm_handler.get_joined_rooms_for_user(user)
+ room_ids = yield self.get_joined_rooms_for_user(user)
if state is None:
state = yield self.store.get_presence_state(user.localpart)
@@ -747,8 +771,7 @@ class PresenceHandler(BaseHandler):
# and also user is informed of server-forced pushes
localusers.add(user)
- rm_handler = self.homeserver.get_handlers().room_member_handler
- room_ids = yield rm_handler.get_joined_rooms_for_user(user)
+ room_ids = yield self.get_joined_rooms_for_user(user)
if not localusers and not room_ids:
defer.returnValue(None)
@@ -793,8 +816,7 @@ class PresenceHandler(BaseHandler):
" | %d interested local observers %r", len(observers), observers
)
- rm_handler = self.homeserver.get_handlers().room_member_handler
- room_ids = yield rm_handler.get_joined_rooms_for_user(user)
+ room_ids = yield self.get_joined_rooms_for_user(user)
if room_ids:
logger.debug(" | %d interested room IDs %r", len(room_ids), room_ids)
@@ -813,10 +835,8 @@ class PresenceHandler(BaseHandler):
self.clock.time_msec() - state.pop("last_active_ago")
)
- statuscache = self._get_or_make_usercache(user)
-
self._user_cachemap_latest_serial += 1
- statuscache.update(state, serial=self._user_cachemap_latest_serial)
+ yield self.update_presence_cache(user, state, room_ids=room_ids)
if not observers and not room_ids:
logger.debug(" | no interested observers or room IDs")
@@ -875,6 +895,35 @@ class PresenceHandler(BaseHandler):
yield defer.DeferredList(deferreds, consumeErrors=True)
@defer.inlineCallbacks
+ def update_presence_cache(self, user, state={}, room_ids=None,
+ add_to_cache=True):
+ """Update the presence cache for a user with a new state and bump the
+ serial to the latest value.
+
+ Args:
+ user(UserID): The user being updated
+ state(dict): The presence state being updated
+ room_ids(None or list of str): A list of room_ids to update. If
+ room_ids is None then fetch the list of room_ids the user is
+ joined to.
+ add_to_cache: Whether to add an entry to the presence cache if the
+ user isn't already in the cache.
+ Returns:
+ A Deferred UserPresenceCache for the user being updated.
+ """
+ if room_ids is None:
+ room_ids = yield self.get_joined_rooms_for_user(user)
+
+ for room_id in room_ids:
+ self._room_serials[room_id] = self._user_cachemap_latest_serial
+ if add_to_cache:
+ statuscache = self._get_or_make_usercache(user)
+ else:
+ statuscache = self._get_or_offline_usercache(user)
+ statuscache.update(state, serial=self._user_cachemap_latest_serial)
+ defer.returnValue(statuscache)
+
+ @defer.inlineCallbacks
def push_update_to_local_and_remote(self, observed_user, statuscache,
users_to_push=[], room_ids=[],
remote_domains=[]):
@@ -997,38 +1046,10 @@ class PresenceEventSource(object):
self.clock = hs.get_clock()
@defer.inlineCallbacks
- def is_visible(self, observer_user, observed_user):
- if observer_user == observed_user:
- defer.returnValue(True)
-
- presence = self.hs.get_handlers().presence_handler
-
- if (yield presence.store.user_rooms_intersect(
- [u.to_string() for u in observer_user, observed_user])):
- defer.returnValue(True)
-
- if self.hs.is_mine(observed_user):
- pushmap = presence._local_pushmap
-
- defer.returnValue(
- observed_user.localpart in pushmap and
- observer_user in pushmap[observed_user.localpart]
- )
- else:
- recvmap = presence._remote_recvmap
-
- defer.returnValue(
- observed_user in recvmap and
- observer_user in recvmap[observed_user]
- )
-
- @defer.inlineCallbacks
@log_function
def get_new_events_for_user(self, user, from_key, limit):
from_key = int(from_key)
- observer_user = user
-
presence = self.hs.get_handlers().presence_handler
cachemap = presence._user_cachemap
@@ -1037,17 +1058,27 @@ class PresenceEventSource(object):
clock = self.clock
latest_serial = 0
+ user_ids_to_check = {user}
+ presence_list = yield presence.store.get_presence_list(
+ user.localpart, accepted=True
+ )
+ if presence_list is not None:
+ user_ids_to_check |= set(
+ UserID.from_string(p["observed_user_id"]) for p in presence_list
+ )
+ room_ids = yield presence.get_joined_rooms_for_user(user)
+ for room_id in set(room_ids) & set(presence._room_serials):
+ if presence._room_serials[room_id] > from_key:
+ joined = yield presence.get_joined_users_for_room_id(room_id)
+ user_ids_to_check |= set(joined)
+
updates = []
- # TODO(paul): use a DeferredList ? How to limit concurrency.
- for observed_user in cachemap.keys():
+ for observed_user in user_ids_to_check & set(cachemap):
cached = cachemap[observed_user]
if cached.serial <= from_key or cached.serial > max_serial:
continue
- if not (yield self.is_visible(observer_user, observed_user)):
- continue
-
latest_serial = max(cached.serial, latest_serial)
updates.append(cached.make_event(user=observed_user, clock=clock))
@@ -1084,8 +1115,6 @@ class PresenceEventSource(object):
def get_pagination_rows(self, user, pagination_config, key):
# TODO (erikj): Does this make sense? Ordering?
- observer_user = user
-
from_key = int(pagination_config.from_key)
if pagination_config.to_key:
@@ -1096,14 +1125,26 @@ class PresenceEventSource(object):
presence = self.hs.get_handlers().presence_handler
cachemap = presence._user_cachemap
+ user_ids_to_check = {user}
+ presence_list = yield presence.store.get_presence_list(
+ user.localpart, accepted=True
+ )
+ if presence_list is not None:
+ user_ids_to_check |= set(
+ UserID.from_string(p["observed_user_id"]) for p in presence_list
+ )
+ room_ids = yield presence.get_joined_rooms_for_user(user)
+ for room_id in set(room_ids) & set(presence._room_serials):
+ if presence._room_serials[room_id] >= from_key:
+ joined = yield presence.get_joined_users_for_room_id(room_id)
+ user_ids_to_check |= set(joined)
+
updates = []
- # TODO(paul): use a DeferredList ? How to limit concurrency.
- for observed_user in cachemap.keys():
+ for observed_user in user_ids_to_check & set(cachemap):
if not (to_key < cachemap[observed_user].serial <= from_key):
continue
- if (yield self.is_visible(observer_user, observed_user)):
- updates.append((observed_user, cachemap[observed_user]))
+ updates.append((observed_user, cachemap[observed_user]))
# TODO(paul): limit
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index c99d237c73..6f976d5ce8 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -110,7 +110,8 @@ class MatrixFederationHttpClient(object):
@defer.inlineCallbacks
def _create_request(self, destination, method, path_bytes,
body_callback, headers_dict={}, param_bytes=b"",
- query_bytes=b"", retry_on_dns_fail=True):
+ query_bytes=b"", retry_on_dns_fail=True,
+ timeout=None):
""" Creates and sends a request to the given url
"""
headers_dict[b"User-Agent"] = [self.version_string]
@@ -158,7 +159,7 @@ class MatrixFederationHttpClient(object):
response = yield self.clock.time_bound_deferred(
request_deferred,
- time_out=60,
+ time_out=timeout/1000. if timeout else 60,
)
logger.debug("Got response to %s", method)
@@ -181,7 +182,7 @@ class MatrixFederationHttpClient(object):
_flatten_response_never_received(e),
)
- if retries_left:
+ if retries_left and not timeout:
yield sleep(2 ** (5 - retries_left))
retries_left -= 1
else:
@@ -334,7 +335,8 @@ class MatrixFederationHttpClient(object):
defer.returnValue(json.loads(body))
@defer.inlineCallbacks
- def get_json(self, destination, path, args={}, retry_on_dns_fail=True):
+ def get_json(self, destination, path, args={}, retry_on_dns_fail=True,
+ timeout=None):
""" GETs some json from the given host homeserver and path
Args:
@@ -343,6 +345,9 @@ class MatrixFederationHttpClient(object):
path (str): The HTTP path.
args (dict): A dictionary used to create query strings, defaults to
None.
+ timeout (int): How long to try (in ms) the destination for before
+ giving up. None indicates no timeout and that the request will
+ be retried.
Returns:
Deferred: Succeeds when we get *any* HTTP response.
@@ -370,7 +375,8 @@ class MatrixFederationHttpClient(object):
path.encode("ascii"),
query_bytes=query_bytes,
body_callback=body_callback,
- retry_on_dns_fail=retry_on_dns_fail
+ retry_on_dns_fail=retry_on_dns_fail,
+ timeout=timeout,
)
if 200 <= response.code < 300:
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 5d4b7843f3..1ba073884b 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -19,6 +19,7 @@ from ._base import SQLBaseStore, cached
from syutil.base64util import encode_base64
import logging
+from Queue import PriorityQueue, Empty
logger = logging.getLogger(__name__)
@@ -330,12 +331,13 @@ class EventFederationStore(SQLBaseStore):
" WHERE event_id = ? AND room_id = ?"
" )"
" AND NOT EXISTS ("
- " SELECT 1 FROM events WHERE event_id = ? AND room_id = ?"
+ " SELECT 1 FROM events WHERE event_id = ? AND room_id = ? "
+ " AND outlier = ?"
" )"
)
txn.executemany(query, [
- (e_id, room_id, e_id, room_id, e_id, room_id, )
+ (e_id, room_id, e_id, room_id, e_id, room_id, False)
for e_id, _ in prev_events
])
@@ -362,7 +364,11 @@ class EventFederationStore(SQLBaseStore):
return self.runInteraction(
"get_backfill_events",
self._get_backfill_events, room_id, event_list, limit
- ).addCallback(self._get_events)
+ ).addCallback(
+ self._get_events
+ ).addCallback(
+ lambda l: sorted(l, key=lambda e: -e.depth)
+ )
def _get_backfill_events(self, txn, room_id, event_list, limit):
logger.debug(
@@ -370,43 +376,54 @@ class EventFederationStore(SQLBaseStore):
room_id, repr(event_list), limit
)
- event_results = event_list
+ event_results = set()
- front = event_list
+ # We want to make sure that we do a breadth-first, "depth" ordered
+ # search.
query = (
- "SELECT prev_event_id FROM event_edges "
- "WHERE room_id = ? AND event_id = ? "
- "LIMIT ?"
+ "SELECT depth, prev_event_id FROM event_edges"
+ " INNER JOIN events"
+ " ON prev_event_id = events.event_id"
+ " AND event_edges.room_id = events.room_id"
+ " WHERE event_edges.room_id = ? AND event_edges.event_id = ?"
+ " AND event_edges.is_state = ?"
+ " LIMIT ?"
)
- # We iterate through all event_ids in `front` to select their previous
- # events. These are dumped in `new_front`.
- # We continue until we reach the limit *or* new_front is empty (i.e.,
- # we've run out of things to select
- while front and len(event_results) < limit:
+ queue = PriorityQueue()
- new_front = []
- for event_id in front:
- logger.debug(
- "_backfill_interaction: id=%s",
- event_id
- )
+ for event_id in event_list:
+ depth = self._simple_select_one_onecol_txn(
+ txn,
+ table="events",
+ keyvalues={
+ "event_id": event_id,
+ },
+ retcol="depth"
+ )
- txn.execute(
- query,
- (room_id, event_id, limit - len(event_results))
- )
+ queue.put((-depth, event_id))
- for row in txn.fetchall():
- logger.debug(
- "_backfill_interaction: got id=%s",
- *row
- )
- new_front.append(row[0])
+ while not queue.empty() and len(event_results) < limit:
+ try:
+ _, event_id = queue.get_nowait()
+ except Empty:
+ break
- front = new_front
- event_results += new_front
+ if event_id in event_results:
+ continue
+
+ event_results.add(event_id)
+
+ txn.execute(
+ query,
+ (room_id, event_id, False, limit - len(event_results))
+ )
+
+ for row in txn.fetchall():
+ if row[1] not in event_results:
+ queue.put((-row[0], row[1]))
return event_results
@@ -472,3 +489,4 @@ class EventFederationStore(SQLBaseStore):
query = "DELETE FROM event_forward_extremities WHERE room_id = ?"
txn.execute(query, (room_id,))
+ txn.call_after(self.get_latest_event_ids_in_room.invalidate, room_id)
diff --git a/tests/appservice/test_appservice.py b/tests/appservice/test_appservice.py
index 62149d6902..8ce8dc0a87 100644
--- a/tests/appservice/test_appservice.py
+++ b/tests/appservice/test_appservice.py
@@ -217,18 +217,9 @@ class ApplicationServiceTestCase(unittest.TestCase):
_regex("@irc_.*")
)
join_list = [
- Mock(
- type="m.room.member", room_id="!foo:bar", sender="@alice:here",
- state_key="@alice:here"
- ),
- Mock(
- type="m.room.member", room_id="!foo:bar", sender="@irc_fo:here",
- state_key="@irc_fo:here" # AS user
- ),
- Mock(
- type="m.room.member", room_id="!foo:bar", sender="@bob:here",
- state_key="@bob:here"
- )
+ "@alice:here",
+ "@irc_fo:here", # AS user
+ "@bob:here",
]
self.event.sender = "@xmpp_foobar:matrix.org"
diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py
index ee773797e7..12cf5747a2 100644
--- a/tests/handlers/test_presence.py
+++ b/tests/handlers/test_presence.py
@@ -624,6 +624,7 @@ class PresencePushTestCase(MockedDatastorePresenceTestCase):
"""
PRESENCE_LIST = {
'apple': [ "@banana:test", "@clementine:test" ],
+ 'banana': [ "@apple:test" ],
}
@defer.inlineCallbacks
@@ -836,12 +837,7 @@ class PresencePushTestCase(MockedDatastorePresenceTestCase):
@defer.inlineCallbacks
def test_recv_remote(self):
- # TODO(paul): Gut-wrenching
- potato_set = self.handler._remote_recvmap.setdefault(self.u_potato,
- set())
- potato_set.add(self.u_apple)
-
- self.room_members = [self.u_banana, self.u_potato]
+ self.room_members = [self.u_apple, self.u_banana, self.u_potato]
self.assertEquals(self.event_source.get_current_key(), 0)
@@ -886,11 +882,8 @@ class PresencePushTestCase(MockedDatastorePresenceTestCase):
@defer.inlineCallbacks
def test_recv_remote_offline(self):
""" Various tests relating to SYN-261 """
- potato_set = self.handler._remote_recvmap.setdefault(self.u_potato,
- set())
- potato_set.add(self.u_apple)
- self.room_members = [self.u_banana, self.u_potato]
+ self.room_members = [self.u_apple, self.u_banana, self.u_potato]
self.assertEquals(self.event_source.get_current_key(), 0)
diff --git a/tests/rest/client/v1/test_presence.py b/tests/rest/client/v1/test_presence.py
index 21f42b3d3e..523b30cf8a 100644
--- a/tests/rest/client/v1/test_presence.py
+++ b/tests/rest/client/v1/test_presence.py
@@ -297,6 +297,9 @@ class PresenceEventStreamTestCase(unittest.TestCase):
else:
return []
hs.handlers.room_member_handler.get_joined_rooms_for_user = get_rooms_for_user
+ hs.handlers.room_member_handler.get_room_members = (
+ lambda r: self.room_members if r == "a-room" else []
+ )
self.mock_datastore = hs.get_datastore()
self.mock_datastore.get_app_service_by_token = Mock(return_value=None)
|