diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py
index d75c366834..3faa571dd9 100644
--- a/synapse/storage/__init__.py
+++ b/synapse/storage/__init__.py
@@ -37,7 +37,6 @@ from .registration import RegistrationStore
from .room import RoomStore
from .roommember import RoomMemberStore
from .stream import StreamStore
-from .pdu import StatePduStore, PduStore, PdusTable
from .transactions import TransactionStore
from .keys import KeyStore
from .event_federation import EventFederationStore
@@ -60,7 +59,6 @@ logger = logging.getLogger(__name__)
SCHEMAS = [
"transactions",
- "pdu",
"users",
"profiles",
"presence",
@@ -89,7 +87,7 @@ class _RollbackButIsFineException(Exception):
class DataStore(RoomMemberStore, RoomStore,
RegistrationStore, StreamStore, ProfileStore, FeedbackStore,
- PresenceStore, PduStore, StatePduStore, TransactionStore,
+ PresenceStore, TransactionStore,
DirectoryStore, KeyStore, StateStore, SignatureStore,
EventFederationStore, ):
@@ -150,68 +148,12 @@ class DataStore(RoomMemberStore, RoomStore,
def _persist_pdu_event_txn(self, txn, pdu=None, event=None,
backfilled=False, stream_ordering=None,
is_new_state=True):
- if pdu is not None:
- self._persist_event_pdu_txn(txn, pdu)
if event is not None:
return self._persist_event_txn(
txn, event, backfilled, stream_ordering,
is_new_state=is_new_state,
)
- def _persist_event_pdu_txn(self, txn, pdu):
- cols = dict(pdu.__dict__)
- unrec_keys = dict(pdu.unrecognized_keys)
- del cols["hashes"]
- del cols["signatures"]
- del cols["content"]
- del cols["prev_pdus"]
- cols["content_json"] = json.dumps(pdu.content)
-
- unrec_keys.update({
- k: v for k, v in cols.items()
- if k not in PdusTable.fields
- })
-
- cols["unrecognized_keys"] = json.dumps(unrec_keys)
-
- cols["ts"] = cols.pop("origin_server_ts")
-
- logger.debug("Persisting: %s", repr(cols))
-
- for hash_alg, hash_base64 in pdu.hashes.items():
- hash_bytes = decode_base64(hash_base64)
- self._store_pdu_content_hash_txn(
- txn, pdu.pdu_id, pdu.origin, hash_alg, hash_bytes,
- )
-
- signatures = pdu.signatures.get(pdu.origin, {})
-
- for key_id, signature_base64 in signatures.items():
- signature_bytes = decode_base64(signature_base64)
- self._store_pdu_origin_signature_txn(
- txn, pdu.pdu_id, pdu.origin, key_id, signature_bytes,
- )
-
- for prev_pdu_id, prev_origin, prev_hashes in pdu.prev_pdus:
- for alg, hash_base64 in prev_hashes.items():
- hash_bytes = decode_base64(hash_base64)
- self._store_prev_pdu_hash_txn(
- txn, pdu.pdu_id, pdu.origin, prev_pdu_id, prev_origin, alg,
- hash_bytes
- )
-
- (ref_alg, ref_hash_bytes) = compute_pdu_event_reference_hash(pdu)
- self._store_pdu_reference_hash_txn(
- txn, pdu.pdu_id, pdu.origin, ref_alg, ref_hash_bytes
- )
-
- if pdu.is_state:
- self._persist_state_txn(txn, pdu.prev_pdus, cols)
- else:
- self._persist_pdu_txn(txn, pdu.prev_pdus, cols)
-
- self._update_min_depth_for_context_txn(txn, pdu.context, pdu.depth)
-
@log_function
def _persist_event_txn(self, txn, event, backfilled, stream_ordering=None,
is_new_state=True):
|