diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index e501251b6e..657935d1ac 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -207,10 +207,6 @@ class FederationServer(FederationBase):
edu.content
)
- pdu_failures = getattr(transaction, "pdu_failures", [])
- for fail in pdu_failures:
- logger.info("Got failure %r", fail)
-
response = {
"pdus": pdu_results,
}
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 5157c3860d..0bb468385d 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -62,8 +62,6 @@ class FederationRemoteSendQueue(object):
self.edus = SortedDict() # stream position -> Edu
- self.failures = SortedDict() # stream position -> (destination, Failure)
-
self.device_messages = SortedDict() # stream position -> destination
self.pos = 1
@@ -79,7 +77,7 @@ class FederationRemoteSendQueue(object):
for queue_name in [
"presence_map", "presence_changed", "keyed_edu", "keyed_edu_changed",
- "edus", "failures", "device_messages", "pos_time",
+ "edus", "device_messages", "pos_time",
]:
register(queue_name, getattr(self, queue_name))
@@ -149,12 +147,6 @@ class FederationRemoteSendQueue(object):
for key in keys[:i]:
del self.edus[key]
- # Delete things out of failure map
- keys = self.failures.keys()
- i = self.failures.bisect_left(position_to_delete)
- for key in keys[:i]:
- del self.failures[key]
-
# Delete things out of device map
keys = self.device_messages.keys()
i = self.device_messages.bisect_left(position_to_delete)
@@ -204,13 +196,6 @@ class FederationRemoteSendQueue(object):
self.notifier.on_new_replication_data()
- def send_failure(self, failure, destination):
- """As per TransactionQueue"""
- pos = self._next_pos()
-
- self.failures[pos] = (destination, str(failure))
- self.notifier.on_new_replication_data()
-
def send_device_messages(self, destination):
"""As per TransactionQueue"""
pos = self._next_pos()
@@ -285,17 +270,6 @@ class FederationRemoteSendQueue(object):
for (pos, edu) in edus:
rows.append((pos, EduRow(edu)))
- # Fetch changed failures
- i = self.failures.bisect_right(from_token)
- j = self.failures.bisect_right(to_token) + 1
- failures = self.failures.items()[i:j]
-
- for (pos, (destination, failure)) in failures:
- rows.append((pos, FailureRow(
- destination=destination,
- failure=failure,
- )))
-
# Fetch changed device messages
i = self.device_messages.bisect_right(from_token)
j = self.device_messages.bisect_right(to_token) + 1
@@ -417,34 +391,6 @@ class EduRow(BaseFederationRow, namedtuple("EduRow", (
buff.edus.setdefault(self.edu.destination, []).append(self.edu)
-class FailureRow(BaseFederationRow, namedtuple("FailureRow", (
- "destination", # str
- "failure",
-))):
- """Streams failures to a remote server. Failures are issued when there was
- something wrong with a transaction the remote sent us, e.g. it included
- an event that was invalid.
- """
-
- TypeId = "f"
-
- @staticmethod
- def from_data(data):
- return FailureRow(
- destination=data["destination"],
- failure=data["failure"],
- )
-
- def to_data(self):
- return {
- "destination": self.destination,
- "failure": self.failure,
- }
-
- def add_to_buffer(self, buff):
- buff.failures.setdefault(self.destination, []).append(self.failure)
-
-
class DeviceRow(BaseFederationRow, namedtuple("DeviceRow", (
"destination", # str
))):
@@ -471,7 +417,6 @@ TypeToRow = {
PresenceRow,
KeyedEduRow,
EduRow,
- FailureRow,
DeviceRow,
)
}
@@ -481,7 +426,6 @@ ParsedFederationStreamData = namedtuple("ParsedFederationStreamData", (
"presence", # list(UserPresenceState)
"keyed_edus", # dict of destination -> { key -> Edu }
"edus", # dict of destination -> [Edu]
- "failures", # dict of destination -> [failures]
"device_destinations", # set of destinations
))
@@ -503,7 +447,6 @@ def process_rows_for_federation(transaction_queue, rows):
presence=[],
keyed_edus={},
edus={},
- failures={},
device_destinations=set(),
)
@@ -532,9 +475,5 @@ def process_rows_for_federation(transaction_queue, rows):
edu.destination, edu.edu_type, edu.content, key=None,
)
- for destination, failure_list in iteritems(buff.failures):
- for failure in failure_list:
- transaction_queue.send_failure(destination, failure)
-
for destination in buff.device_destinations:
transaction_queue.send_device_messages(destination)
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 6996d6b695..78f9d40a3a 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -116,9 +116,6 @@ class TransactionQueue(object):
),
)
- # destination -> list of tuple(failure, deferred)
- self.pending_failures_by_dest = {}
-
# destination -> stream_id of last successfully sent to-device message.
# NB: may be a long or an int.
self.last_device_stream_id_by_dest = {}
@@ -382,19 +379,6 @@ class TransactionQueue(object):
self._attempt_new_transaction(destination)
- def send_failure(self, failure, destination):
- if destination == self.server_name or destination == "localhost":
- return
-
- if not self.can_send_to(destination):
- return
-
- self.pending_failures_by_dest.setdefault(
- destination, []
- ).append(failure)
-
- self._attempt_new_transaction(destination)
-
def send_device_messages(self, destination):
if destination == self.server_name or destination == "localhost":
return
@@ -469,7 +453,6 @@ class TransactionQueue(object):
pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
pending_edus = self.pending_edus_by_dest.pop(destination, [])
pending_presence = self.pending_presence_by_dest.pop(destination, {})
- pending_failures = self.pending_failures_by_dest.pop(destination, [])
pending_edus.extend(
self.pending_edus_keyed_by_dest.pop(destination, {}).values()
@@ -497,7 +480,7 @@ class TransactionQueue(object):
logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d",
destination, len(pending_pdus))
- if not pending_pdus and not pending_edus and not pending_failures:
+ if not pending_pdus and not pending_edus:
logger.debug("TX [%s] Nothing to send", destination)
self.last_device_stream_id_by_dest[destination] = (
device_stream_id
@@ -507,7 +490,7 @@ class TransactionQueue(object):
# END CRITICAL SECTION
success = yield self._send_new_transaction(
- destination, pending_pdus, pending_edus, pending_failures,
+ destination, pending_pdus, pending_edus,
)
if success:
sent_transactions_counter.inc()
@@ -584,14 +567,12 @@ class TransactionQueue(object):
@measure_func("_send_new_transaction")
@defer.inlineCallbacks
- def _send_new_transaction(self, destination, pending_pdus, pending_edus,
- pending_failures):
+ def _send_new_transaction(self, destination, pending_pdus, pending_edus):
# Sort based on the order field
pending_pdus.sort(key=lambda t: t[1])
pdus = [x[0] for x in pending_pdus]
edus = pending_edus
- failures = [x.get_dict() for x in pending_failures]
success = True
@@ -601,11 +582,10 @@ class TransactionQueue(object):
logger.debug(
"TX [%s] {%s} Attempting new transaction"
- " (pdus: %d, edus: %d, failures: %d)",
+ " (pdus: %d, edus: %d)",
destination, txn_id,
len(pdus),
len(edus),
- len(failures)
)
logger.debug("TX [%s] Persisting transaction...", destination)
@@ -617,7 +597,6 @@ class TransactionQueue(object):
destination=destination,
pdus=pdus,
edus=edus,
- pdu_failures=failures,
)
self._next_txn_id += 1
@@ -627,12 +606,11 @@ class TransactionQueue(object):
logger.debug("TX [%s] Persisted transaction", destination)
logger.info(
"TX [%s] {%s} Sending transaction [%s],"
- " (PDUs: %d, EDUs: %d, failures: %d)",
+ " (PDUs: %d, EDUs: %d)",
destination, txn_id,
transaction.transaction_id,
len(pdus),
len(edus),
- len(failures),
)
# Actually send the transaction
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 8574898f0c..3b5ea9515a 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -283,11 +283,10 @@ class FederationSendServlet(BaseFederationServlet):
)
logger.info(
- "Received txn %s from %s. (PDUs: %d, EDUs: %d, failures: %d)",
+ "Received txn %s from %s. (PDUs: %d, EDUs: %d)",
transaction_id, origin,
len(transaction_data.get("pdus", [])),
len(transaction_data.get("edus", [])),
- len(transaction_data.get("failures", [])),
)
# We should ideally be getting this from the security layer.
diff --git a/synapse/federation/units.py b/synapse/federation/units.py
index bb1b3b13f7..c5ab14314e 100644
--- a/synapse/federation/units.py
+++ b/synapse/federation/units.py
@@ -73,7 +73,6 @@ class Transaction(JsonEncodedObject):
"previous_ids",
"pdus",
"edus",
- "pdu_failures",
]
internal_keys = [
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 003b848c00..7b7804d9b2 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -15,6 +15,7 @@
# limitations under the License.
"""Contains functions for performing events on rooms."""
+import itertools
import logging
import math
import string
@@ -401,7 +402,7 @@ class RoomContextHandler(object):
self.store = hs.get_datastore()
@defer.inlineCallbacks
- def get_event_context(self, user, room_id, event_id, limit):
+ def get_event_context(self, user, room_id, event_id, limit, event_filter):
"""Retrieves events, pagination tokens and state around a given event
in a room.
@@ -411,6 +412,8 @@ class RoomContextHandler(object):
event_id (str)
limit (int): The maximum number of events to return in total
(excluding state).
+ event_filter (Filter|None): the filter to apply to the events returned
+ (excluding the target event_id)
Returns:
dict, or None if the event isn't found
@@ -443,7 +446,7 @@ class RoomContextHandler(object):
)
results = yield self.store.get_events_around(
- room_id, event_id, before_limit, after_limit
+ room_id, event_id, before_limit, after_limit, event_filter
)
results["events_before"] = yield filter_evts(results["events_before"])
@@ -455,8 +458,23 @@ class RoomContextHandler(object):
else:
last_event_id = event_id
+ types = None
+ filtered_types = None
+ if event_filter and event_filter.lazy_load_members():
+ members = set(ev.sender for ev in itertools.chain(
+ results["events_before"],
+ (results["event"],),
+ results["events_after"],
+ ))
+ filtered_types = [EventTypes.Member]
+ types = [(EventTypes.Member, member) for member in members]
+
+ # XXX: why do we return the state as of the last event rather than the
+ # first? Shouldn't we be consistent with /sync?
+ # https://github.com/matrix-org/matrix-doc/issues/687
+
state = yield self.store.get_state_for_events(
- [last_event_id], None
+ [last_event_id], types, filtered_types=filtered_types,
)
results["state"] = list(state[last_event_id].values())
diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py
index 69ae9731d5..c464adbd0b 100644
--- a/synapse/handlers/search.py
+++ b/synapse/handlers/search.py
@@ -287,7 +287,7 @@ class SearchHandler(BaseHandler):
contexts = {}
for event in allowed_events:
res = yield self.store.get_events_around(
- event.room_id, event.event_id, before_limit, after_limit
+ event.room_id, event.event_id, before_limit, after_limit,
)
logger.info(
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index b7bd878c90..13c331550b 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -531,11 +531,20 @@ class RoomEventContextServlet(ClientV1RestServlet):
limit = parse_integer(request, "limit", default=10)
+ # picking the API shape for symmetry with /messages
+ filter_bytes = parse_string(request, "filter")
+ if filter_bytes:
+ filter_json = urlparse.unquote(filter_bytes).decode("UTF-8")
+ event_filter = Filter(json.loads(filter_json))
+ else:
+ event_filter = None
+
results = yield self.room_context_handler.get_event_context(
requester.user,
room_id,
event_id,
limit,
+ event_filter,
)
if not results:
diff --git a/synapse/secrets.py b/synapse/secrets.py
index f397daaa5e..f05e9ea535 100644
--- a/synapse/secrets.py
+++ b/synapse/secrets.py
@@ -20,17 +20,16 @@ See https://docs.python.org/3/library/secrets.html#module-secrets for the API
used in Python 3.6, and the API emulated in Python 2.7.
"""
-import six
+import sys
-if six.PY3:
+# secrets is available since python 3.6
+if sys.version_info[0:2] >= (3, 6):
import secrets
def Secrets():
return secrets
-
else:
-
import os
import binascii
diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index ba88a54979..3bd63cd195 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -66,6 +66,7 @@ class DataStore(RoomMemberStore, RoomStore,
PresenceStore, TransactionStore,
DirectoryStore, KeyStore, StateStore, SignatureStore,
ApplicationServiceStore,
+ EventsStore,
EventFederationStore,
MediaRepositoryStore,
RejectionsStore,
@@ -73,7 +74,6 @@ class DataStore(RoomMemberStore, RoomStore,
PusherStore,
PushRuleStore,
ApplicationServiceTransactionStore,
- EventsStore,
ReceiptsStore,
EndToEndKeyStore,
SearchStore,
diff --git a/synapse/storage/appservice.py b/synapse/storage/appservice.py
index 9f12b360bc..31248d5e06 100644
--- a/synapse/storage/appservice.py
+++ b/synapse/storage/appservice.py
@@ -22,7 +22,7 @@ from twisted.internet import defer
from synapse.appservice import AppServiceTransaction
from synapse.config.appservice import load_appservices
-from synapse.storage.events import EventsWorkerStore
+from synapse.storage.events_worker import EventsWorkerStore
from ._base import SQLBaseStore
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index 5d3ee90017..8bd35df119 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -25,7 +25,7 @@ from twisted.internet import defer
from synapse.api.errors import StoreError
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage._base import SQLBaseStore
-from synapse.storage.events import EventsWorkerStore
+from synapse.storage.events_worker import EventsWorkerStore
from synapse.storage.signatures import SignatureWorkerStore
from synapse.util.caches.descriptors import cached
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 2f482af3a1..c98e524ba1 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -34,6 +34,8 @@ from synapse.api.errors import SynapseError
from synapse.events import EventBase # noqa: F401
from synapse.events.snapshot import EventContext # noqa: F401
from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.storage.background_updates import BackgroundUpdateStore
+from synapse.storage.event_federation import EventFederationStore
from synapse.storage.events_worker import EventsWorkerStore
from synapse.types import RoomStreamToken, get_domain_from_id
from synapse.util.async import ObservableDeferred
@@ -193,7 +195,9 @@ def _retry_on_integrity_error(func):
return f
-class EventsStore(EventsWorkerStore):
+# inherits from EventFederationStore so that we can call _update_backward_extremities
+# and _handle_mult_prev_events (though arguably those could both be moved in here)
+class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore):
EVENT_ORIGIN_SERVER_TS_NAME = "event_origin_server_ts"
EVENT_FIELDS_SENDER_URL_UPDATE_NAME = "event_fields_sender_url"
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 027bf8c85e..10dce21cea 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -24,7 +24,7 @@ from canonicaljson import json
from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
-from synapse.storage.events import EventsWorkerStore
+from synapse.storage.events_worker import EventsWorkerStore
from synapse.types import get_domain_from_id
from synapse.util.async import Linearizer
from synapse.util.caches import intern_string
diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py
index 66856342f0..b9f2b74ac6 100644
--- a/synapse/storage/stream.py
+++ b/synapse/storage/stream.py
@@ -43,7 +43,7 @@ from twisted.internet import defer
from synapse.storage._base import SQLBaseStore
from synapse.storage.engines import PostgresEngine
-from synapse.storage.events import EventsWorkerStore
+from synapse.storage.events_worker import EventsWorkerStore
from synapse.types import RoomStreamToken
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
@@ -527,7 +527,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
)
@defer.inlineCallbacks
- def get_events_around(self, room_id, event_id, before_limit, after_limit):
+ def get_events_around(
+ self, room_id, event_id, before_limit, after_limit, event_filter=None,
+ ):
"""Retrieve events and pagination tokens around a given event in a
room.
@@ -536,6 +538,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
event_id (str)
before_limit (int)
after_limit (int)
+ event_filter (Filter|None)
Returns:
dict
@@ -543,7 +546,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
results = yield self.runInteraction(
"get_events_around", self._get_events_around_txn,
- room_id, event_id, before_limit, after_limit
+ room_id, event_id, before_limit, after_limit, event_filter,
)
events_before = yield self._get_events(
@@ -563,7 +566,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
"end": results["after"]["token"],
})
- def _get_events_around_txn(self, txn, room_id, event_id, before_limit, after_limit):
+ def _get_events_around_txn(
+ self, txn, room_id, event_id, before_limit, after_limit, event_filter,
+ ):
"""Retrieves event_ids and pagination tokens around a given event in a
room.
@@ -572,6 +577,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
event_id (str)
before_limit (int)
after_limit (int)
+ event_filter (Filter|None)
Returns:
dict
@@ -601,11 +607,13 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
rows, start_token = self._paginate_room_events_txn(
txn, room_id, before_token, direction='b', limit=before_limit,
+ event_filter=event_filter,
)
events_before = [r.event_id for r in rows]
rows, end_token = self._paginate_room_events_txn(
txn, room_id, after_token, direction='f', limit=after_limit,
+ event_filter=event_filter,
)
events_after = [r.event_id for r in rows]
|