1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
1809
1810
1811
1812
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824
1825
1826
1827
1828
1829
1830
1831
1832
1833
1834
1835
1836
1837
1838
1839
1840
1841
1842
1843
1844
1845
1846
1847
1848
1849
1850
1851
1852
1853
1854
1855
1856
1857
1858
1859
1860
1861
1862
1863
1864
1865
1866
1867
1868
1869
1870
1871
1872
1873
1874
1875
1876
1877
1878
1879
1880
1881
1882
1883
1884
1885
1886
1887
1888
1889
1890
1891
1892
1893
1894
1895
1896
1897
1898
1899
1900
1901
1902
1903
1904
1905
1906
1907
1908
1909
1910
1911
1912
1913
1914
1915
1916
1917
1918
1919
1920
1921
1922
1923
1924
1925
1926
1927
1928
1929
1930
1931
1932
1933
1934
1935
1936
1937
1938
1939
1940
1941
1942
1943
1944
1945
1946
1947
1948
1949
1950
1951
1952
1953
1954
1955
1956
1957
1958
1959
1960
1961
1962
1963
1964
1965
1966
1967
1968
1969
1970
1971
1972
1973
1974
1975
1976
1977
1978
1979
1980
1981
1982
1983
1984
1985
1986
1987
1988
1989
1990
1991
1992
1993
1994
1995
1996
1997
1998
1999
2000
2001
2002
2003
2004
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
2026
2027
2028
2029
2030
2031
2032
2033
2034
2035
2036
2037
2038
2039
2040
2041
2042
2043
2044
2045
2046
2047
2048
2049
2050
2051
2052
2053
2054
2055
2056
2057
2058
2059
2060
2061
2062
2063
2064
2065
2066
2067
2068
2069
2070
2071
2072
2073
2074
2075
2076
2077
2078
2079
2080
2081
2082
2083
2084
2085
2086
2087
2088
2089
2090
2091
2092
2093
2094
2095
2096
2097
2098
2099
2100
2101
2102
2103
2104
2105
2106
2107
2108
2109
2110
2111
2112
2113
2114
2115
2116
2117
2118
2119
2120
2121
2122
2123
2124
2125
2126
2127
2128
2129
2130
2131
2132
2133
2134
2135
2136
2137
2138
2139
2140
2141
2142
2143
2144
2145
2146
2147
2148
2149
2150
2151
2152
2153
2154
2155
2156
2157
2158
2159
2160
2161
2162
2163
2164
2165
2166
2167
2168
2169
2170
2171
2172
2173
2174
2175
2176
2177
2178
2179
2180
2181
2182
2183
2184
2185
2186
2187
2188
2189
2190
2191
2192
2193
2194
2195
2196
2197
2198
2199
2200
2201
2202
2203
2204
2205
2206
2207
2208
2209
2210
2211
2212
2213
2214
2215
2216
2217
2218
2219
2220
2221
2222
2223
2224
2225
2226
//! [`Chain`] implements a single non-finalized blockchain,
//! starting at the finalized tip.

use std::{
    cmp::Ordering,
    collections::{BTreeMap, BTreeSet, HashMap, HashSet},
    ops::{Deref, DerefMut, RangeInclusive},
    sync::Arc,
};

use mset::MultiSet;
use tracing::instrument;

use zebra_chain::{
    amount::{Amount, NegativeAllowed, NonNegative},
    block::{self, Height},
    history_tree::HistoryTree,
    orchard,
    parallel::tree::NoteCommitmentTrees,
    parameters::Network,
    primitives::Groth16Proof,
    sapling, sprout,
    subtree::{NoteCommitmentSubtree, NoteCommitmentSubtreeData, NoteCommitmentSubtreeIndex},
    transaction::Transaction::*,
    transaction::{self, Transaction},
    transparent,
    value_balance::ValueBalance,
    work::difficulty::PartialCumulativeWork,
};

use crate::{
    request::Treestate, service::check, ContextuallyVerifiedBlock, HashOrHeight, OutputLocation,
    TransactionLocation, ValidateContextError,
};

use self::index::TransparentTransfers;

pub mod index;

/// A single non-finalized partial chain, from the child of the finalized tip,
/// to a non-finalized chain tip.
#[derive(Clone, Debug, Default)]
pub struct Chain {
    // Config
    //
    /// The configured network for this chain.
    network: Network,

    /// The internal state of this chain.
    inner: ChainInner,

    // Diagnostics
    //
    /// The last height this chain forked at. Diagnostics only.
    ///
    /// This field is only used for metrics, it is not consensus-critical, and it is not checked
    /// for equality.
    ///
    /// We keep the same last fork height in both sides of a clone, because every new block clones
    /// a chain, even if it's just growing that chain.
    pub(super) last_fork_height: Option<Height>,
    // # Note
    //
    // Most diagnostics are implemented on the NonFinalizedState, rather than each chain.
    // Some diagnostics only use the best chain, and others need to modify the Chain state,
    // but that's difficult with `Arc<Chain>`s.
}

/// The internal state of [`Chain`].
#[derive(Clone, Debug, PartialEq, Eq, Default)]
pub struct ChainInner {
    // Blocks, heights, hashes, and transaction locations
    //
    /// The contextually valid blocks which form this non-finalized partial chain, in height order.
    pub(crate) blocks: BTreeMap<block::Height, ContextuallyVerifiedBlock>,

    /// An index of block heights for each block hash in `blocks`.
    pub height_by_hash: HashMap<block::Hash, block::Height>,

    /// An index of [`TransactionLocation`]s for each transaction hash in `blocks`.
    pub tx_loc_by_hash: HashMap<transaction::Hash, TransactionLocation>,

    // Transparent outputs and spends
    //
    /// The [`transparent::Utxo`]s created by `blocks`.
    ///
    /// Note that these UTXOs may not be unspent.
    /// Outputs can be spent by later transactions or blocks in the chain.
    //
    // TODO: replace OutPoint with OutputLocation?
    pub(crate) created_utxos: HashMap<transparent::OutPoint, transparent::OrderedUtxo>,
    /// The [`transparent::OutPoint`]s spent by `blocks`,
    /// including those created by earlier transactions or blocks in the chain.
    pub(crate) spent_utxos: HashSet<transparent::OutPoint>,

    // Note commitment trees
    //
    /// The Sprout note commitment tree for each anchor.
    /// This is required for interstitial states.
    ///
    /// When a chain is forked from the finalized tip, also contains the finalized tip root.
    /// This extra root is removed when the first non-finalized block is committed.
    pub(crate) sprout_trees_by_anchor:
        HashMap<sprout::tree::Root, Arc<sprout::tree::NoteCommitmentTree>>,
    /// The Sprout note commitment tree for each height.
    ///
    /// When a chain is forked from the finalized tip, also contains the finalized tip tree.
    /// This extra tree is removed when the first non-finalized block is committed.
    pub(crate) sprout_trees_by_height:
        BTreeMap<block::Height, Arc<sprout::tree::NoteCommitmentTree>>,

    /// The Sapling note commitment tree for each height.
    ///
    /// When a chain is forked from the finalized tip, also contains the finalized tip tree.
    /// This extra tree is removed when the first non-finalized block is committed.
    pub(crate) sapling_trees_by_height:
        BTreeMap<block::Height, Arc<sapling::tree::NoteCommitmentTree>>,

    /// The Orchard note commitment tree for each height.
    ///
    /// When a chain is forked from the finalized tip, also contains the finalized tip tree.
    /// This extra tree is removed when the first non-finalized block is committed.
    pub(crate) orchard_trees_by_height:
        BTreeMap<block::Height, Arc<orchard::tree::NoteCommitmentTree>>,

    // History trees
    //
    /// The ZIP-221 history tree for each height, including all finalized blocks,
    /// and the non-finalized `blocks` below that height in this chain.
    ///
    /// When a chain is forked from the finalized tip, also contains the finalized tip tree.
    /// This extra tree is removed when the first non-finalized block is committed.
    pub(crate) history_trees_by_height: BTreeMap<block::Height, Arc<HistoryTree>>,

    // Anchors
    //
    /// The Sprout anchors created by `blocks`.
    ///
    /// When a chain is forked from the finalized tip, also contains the finalized tip root.
    /// This extra root is removed when the first non-finalized block is committed.
    pub(crate) sprout_anchors: MultiSet<sprout::tree::Root>,
    /// The Sprout anchors created by each block in `blocks`.
    ///
    /// When a chain is forked from the finalized tip, also contains the finalized tip root.
    /// This extra root is removed when the first non-finalized block is committed.
    pub(crate) sprout_anchors_by_height: BTreeMap<block::Height, sprout::tree::Root>,

    /// The Sapling anchors created by `blocks`.
    ///
    /// When a chain is forked from the finalized tip, also contains the finalized tip root.
    /// This extra root is removed when the first non-finalized block is committed.
    pub(crate) sapling_anchors: MultiSet<sapling::tree::Root>,
    /// The Sapling anchors created by each block in `blocks`.
    ///
    /// When a chain is forked from the finalized tip, also contains the finalized tip root.
    /// This extra root is removed when the first non-finalized block is committed.
    pub(crate) sapling_anchors_by_height: BTreeMap<block::Height, sapling::tree::Root>,
    /// A list of Sapling subtrees completed in the non-finalized state
    pub(crate) sapling_subtrees:
        BTreeMap<NoteCommitmentSubtreeIndex, NoteCommitmentSubtreeData<sapling::tree::Node>>,

    /// The Orchard anchors created by `blocks`.
    ///
    /// When a chain is forked from the finalized tip, also contains the finalized tip root.
    /// This extra root is removed when the first non-finalized block is committed.
    pub(crate) orchard_anchors: MultiSet<orchard::tree::Root>,
    /// The Orchard anchors created by each block in `blocks`.
    ///
    /// When a chain is forked from the finalized tip, also contains the finalized tip root.
    /// This extra root is removed when the first non-finalized block is committed.
    pub(crate) orchard_anchors_by_height: BTreeMap<block::Height, orchard::tree::Root>,
    /// A list of Orchard subtrees completed in the non-finalized state
    pub(crate) orchard_subtrees:
        BTreeMap<NoteCommitmentSubtreeIndex, NoteCommitmentSubtreeData<orchard::tree::Node>>,

    // Nullifiers
    //
    /// The Sprout nullifiers revealed by `blocks`.
    pub(crate) sprout_nullifiers: HashSet<sprout::Nullifier>,
    /// The Sapling nullifiers revealed by `blocks`.
    pub(crate) sapling_nullifiers: HashSet<sapling::Nullifier>,
    /// The Orchard nullifiers revealed by `blocks`.
    pub(crate) orchard_nullifiers: HashSet<orchard::Nullifier>,

    // Transparent Transfers
    // TODO: move to the transparent section
    //
    /// Partial transparent address index data from `blocks`.
    pub(super) partial_transparent_transfers: HashMap<transparent::Address, TransparentTransfers>,

    // Chain Work
    //
    /// The cumulative work represented by `blocks`.
    ///
    /// Since the best chain is determined by the largest cumulative work,
    /// the work represented by finalized blocks can be ignored,
    /// because they are common to all non-finalized chains.
    pub(super) partial_cumulative_work: PartialCumulativeWork,

    // Chain Pools
    //
    /// The chain value pool balances of the tip of this [`Chain`],
    /// including the block value pool changes from all finalized blocks,
    /// and the non-finalized blocks in this chain.
    ///
    /// When a new chain is created from the finalized tip,
    /// it is initialized with the finalized tip chain value pool balances.
    pub(crate) chain_value_pools: ValueBalance<NonNegative>,
}

impl Chain {
    /// Create a new Chain with the given finalized tip trees and network.
    pub(crate) fn new(
        network: &Network,
        finalized_tip_height: Height,
        sprout_note_commitment_tree: Arc<sprout::tree::NoteCommitmentTree>,
        sapling_note_commitment_tree: Arc<sapling::tree::NoteCommitmentTree>,
        orchard_note_commitment_tree: Arc<orchard::tree::NoteCommitmentTree>,
        history_tree: Arc<HistoryTree>,
        finalized_tip_chain_value_pools: ValueBalance<NonNegative>,
    ) -> Self {
        let inner = ChainInner {
            blocks: Default::default(),
            height_by_hash: Default::default(),
            tx_loc_by_hash: Default::default(),
            created_utxos: Default::default(),
            spent_utxos: Default::default(),
            sprout_anchors: MultiSet::new(),
            sprout_anchors_by_height: Default::default(),
            sprout_trees_by_anchor: Default::default(),
            sprout_trees_by_height: Default::default(),
            sapling_anchors: MultiSet::new(),
            sapling_anchors_by_height: Default::default(),
            sapling_trees_by_height: Default::default(),
            sapling_subtrees: Default::default(),
            orchard_anchors: MultiSet::new(),
            orchard_anchors_by_height: Default::default(),
            orchard_trees_by_height: Default::default(),
            orchard_subtrees: Default::default(),
            sprout_nullifiers: Default::default(),
            sapling_nullifiers: Default::default(),
            orchard_nullifiers: Default::default(),
            partial_transparent_transfers: Default::default(),
            partial_cumulative_work: Default::default(),
            history_trees_by_height: Default::default(),
            chain_value_pools: finalized_tip_chain_value_pools,
        };

        let mut chain = Self {
            network: network.clone(),
            inner,
            last_fork_height: None,
        };

        chain.add_sprout_tree_and_anchor(finalized_tip_height, sprout_note_commitment_tree);
        chain.add_sapling_tree_and_anchor(finalized_tip_height, sapling_note_commitment_tree);
        chain.add_orchard_tree_and_anchor(finalized_tip_height, orchard_note_commitment_tree);
        chain.add_history_tree(finalized_tip_height, history_tree);

        chain
    }

    /// Is the internal state of `self` the same as `other`?
    ///
    /// [`Chain`] has custom [`Eq`] and [`Ord`] implementations based on proof of work,
    /// which are used to select the best chain. So we can't derive [`Eq`] for [`Chain`].
    ///
    /// Unlike the custom trait impls, this method returns `true` if the entire internal state
    /// of two chains is equal.
    ///
    /// If the internal states are different, it returns `false`,
    /// even if the blocks in the two chains are equal.
    #[cfg(any(test, feature = "proptest-impl"))]
    pub fn eq_internal_state(&self, other: &Chain) -> bool {
        self.inner == other.inner
    }

    /// Returns the last fork height if that height is still in the non-finalized state.
    /// Otherwise, if that fork has been finalized, returns `None`.
    #[allow(dead_code)]
    pub fn recent_fork_height(&self) -> Option<Height> {
        self.last_fork_height
            .filter(|last| last >= &self.non_finalized_root_height())
    }

    /// Returns this chain fork's length, if its fork is still in the non-finalized state.
    /// Otherwise, if the fork has been finalized, returns `None`.
    #[allow(dead_code)]
    pub fn recent_fork_length(&self) -> Option<u32> {
        let fork_length = self.non_finalized_tip_height() - self.recent_fork_height()?;

        // If the fork is above the tip, it is invalid, so just return `None`
        // (Ignoring invalid data is ok because this is metrics-only code.)
        fork_length.try_into().ok()
    }

    /// Push a contextually valid non-finalized block into this chain as the new tip.
    ///
    /// If the block is invalid, drops this chain, and returns an error.
    ///
    /// Note: a [`ContextuallyVerifiedBlock`] isn't actually contextually valid until
    /// [`Self::update_chain_tip_with`] returns success.
    #[instrument(level = "debug", skip(self, block), fields(block = %block.block))]
    pub fn push(mut self, block: ContextuallyVerifiedBlock) -> Result<Chain, ValidateContextError> {
        // update cumulative data members
        self.update_chain_tip_with(&block)?;

        tracing::debug!(block = %block.block, "adding block to chain");
        self.blocks.insert(block.height, block);

        Ok(self)
    }

    /// Pops the lowest height block of the non-finalized portion of a chain,
    /// and returns it with its associated treestate.
    #[instrument(level = "debug", skip(self))]
    pub(crate) fn pop_root(&mut self) -> (ContextuallyVerifiedBlock, Treestate) {
        // Obtain the lowest height.
        let block_height = self.non_finalized_root_height();

        // Obtain the treestate associated with the block being finalized.
        let treestate = self
            .treestate(block_height.into())
            .expect("The treestate must be present for the root height.");

        if treestate.note_commitment_trees.sapling_subtree.is_some() {
            self.sapling_subtrees.pop_first();
        }

        if treestate.note_commitment_trees.orchard_subtree.is_some() {
            self.orchard_subtrees.pop_first();
        }

        // Remove the lowest height block from `self.blocks`.
        let block = self
            .blocks
            .remove(&block_height)
            .expect("only called while blocks is populated");

        // Update cumulative data members.
        self.revert_chain_with(&block, RevertPosition::Root);

        (block, treestate)
    }

    /// Returns the height of the chain root.
    pub fn non_finalized_root_height(&self) -> block::Height {
        self.blocks
            .keys()
            .next()
            .cloned()
            .expect("only called while blocks is populated")
    }

    /// Fork and return a chain at the block with the given `fork_tip`, if it is part of this
    /// chain. Otherwise, if this chain does not contain `fork_tip`, returns `None`.
    pub fn fork(&self, fork_tip: block::Hash) -> Option<Self> {
        if !self.height_by_hash.contains_key(&fork_tip) {
            return None;
        }

        let mut forked = self.clone();

        // Revert blocks above the fork
        while forked.non_finalized_tip_hash() != fork_tip {
            forked.pop_tip();

            forked.last_fork_height = Some(forked.non_finalized_tip_height());
        }

        Some(forked)
    }

    /// Returns the [`Network`] for this chain.
    pub fn network(&self) -> Network {
        self.network.clone()
    }

    /// Returns the [`ContextuallyVerifiedBlock`] with [`block::Hash`] or
    /// [`Height`], if it exists in this chain.
    pub fn block(&self, hash_or_height: HashOrHeight) -> Option<&ContextuallyVerifiedBlock> {
        let height =
            hash_or_height.height_or_else(|hash| self.height_by_hash.get(&hash).cloned())?;

        self.blocks.get(&height)
    }

    /// Returns the [`Transaction`] with [`transaction::Hash`], if it exists in this chain.
    pub fn transaction(
        &self,
        hash: transaction::Hash,
    ) -> Option<(&Arc<Transaction>, block::Height)> {
        self.tx_loc_by_hash.get(&hash).map(|tx_loc| {
            (
                &self.blocks[&tx_loc.height].block.transactions[tx_loc.index.as_usize()],
                tx_loc.height,
            )
        })
    }

    /// Returns the [`Transaction`] at [`TransactionLocation`], if it exists in this chain.
    #[allow(dead_code)]
    pub fn transaction_by_loc(&self, tx_loc: TransactionLocation) -> Option<&Arc<Transaction>> {
        self.blocks
            .get(&tx_loc.height)?
            .block
            .transactions
            .get(tx_loc.index.as_usize())
    }

    /// Returns the [`transaction::Hash`] for the transaction at [`TransactionLocation`],
    /// if it exists in this chain.
    #[allow(dead_code)]
    pub fn transaction_hash_by_loc(
        &self,
        tx_loc: TransactionLocation,
    ) -> Option<&transaction::Hash> {
        self.blocks
            .get(&tx_loc.height)?
            .transaction_hashes
            .get(tx_loc.index.as_usize())
    }

    /// Returns the [`transaction::Hash`]es in the block with `hash_or_height`,
    /// if it exists in this chain.
    ///
    /// Hashes are returned in block order.
    ///
    /// Returns `None` if the block is not found.
    pub fn transaction_hashes_for_block(
        &self,
        hash_or_height: HashOrHeight,
    ) -> Option<Arc<[transaction::Hash]>> {
        let transaction_hashes = self.block(hash_or_height)?.transaction_hashes.clone();

        Some(transaction_hashes)
    }

    /// Returns the [`block::Hash`] for `height`, if it exists in this chain.
    pub fn hash_by_height(&self, height: Height) -> Option<block::Hash> {
        let hash = self.blocks.get(&height)?.hash;

        Some(hash)
    }

    /// Returns the [`Height`] for `hash`, if it exists in this chain.
    pub fn height_by_hash(&self, hash: block::Hash) -> Option<Height> {
        self.height_by_hash.get(&hash).cloned()
    }

    /// Returns true is the chain contains the given block hash.
    /// Returns false otherwise.
    pub fn contains_block_hash(&self, hash: block::Hash) -> bool {
        self.height_by_hash.contains_key(&hash)
    }

    /// Returns true is the chain contains the given block height.
    /// Returns false otherwise.
    pub fn contains_block_height(&self, height: Height) -> bool {
        self.blocks.contains_key(&height)
    }

    /// Returns true is the chain contains the given block hash or height.
    /// Returns false otherwise.
    #[allow(dead_code)]
    pub fn contains_hash_or_height(&self, hash_or_height: impl Into<HashOrHeight>) -> bool {
        use HashOrHeight::*;

        let hash_or_height = hash_or_height.into();

        match hash_or_height {
            Hash(hash) => self.contains_block_hash(hash),
            Height(height) => self.contains_block_height(height),
        }
    }

    /// Returns the non-finalized tip block height and hash.
    pub fn non_finalized_tip(&self) -> (Height, block::Hash) {
        (
            self.non_finalized_tip_height(),
            self.non_finalized_tip_hash(),
        )
    }

    /// Returns the Sprout note commitment tree of the tip of this [`Chain`],
    /// including all finalized notes, and the non-finalized notes in this chain.
    ///
    /// If the chain is empty, instead returns the tree of the finalized tip,
    /// which was supplied in [`Chain::new()`]
    ///
    /// # Panics
    ///
    /// If this chain has no sprout trees. (This should be impossible.)
    pub fn sprout_note_commitment_tree_for_tip(&self) -> Arc<sprout::tree::NoteCommitmentTree> {
        self.sprout_trees_by_height
            .last_key_value()
            .expect("only called while sprout_trees_by_height is populated")
            .1
            .clone()
    }

    /// Returns the Sprout [`NoteCommitmentTree`](sprout::tree::NoteCommitmentTree) specified by
    /// a [`HashOrHeight`], if it exists in the non-finalized [`Chain`].
    pub fn sprout_tree(
        &self,
        hash_or_height: HashOrHeight,
    ) -> Option<Arc<sprout::tree::NoteCommitmentTree>> {
        let height =
            hash_or_height.height_or_else(|hash| self.height_by_hash.get(&hash).cloned())?;

        self.sprout_trees_by_height
            .range(..=height)
            .next_back()
            .map(|(_height, tree)| tree.clone())
    }

    /// Adds the Sprout `tree` to the tree and anchor indexes at `height`.
    ///
    /// `height` can be either:
    ///
    /// - the height of a new block that has just been added to the chain tip, or
    /// - the finalized tip height—the height of the parent of the first block of a new chain.
    ///
    /// Stores only the first tree in each series of identical trees.
    ///
    /// # Panics
    ///
    /// - If there's a tree already stored at `height`.
    /// - If there's an anchor already stored at `height`.
    fn add_sprout_tree_and_anchor(
        &mut self,
        height: Height,
        tree: Arc<sprout::tree::NoteCommitmentTree>,
    ) {
        // Having updated all the note commitment trees and nullifier sets in
        // this block, the roots of the note commitment trees as of the last
        // transaction are the anchor treestates of this block.
        //
        // Use the previously cached root which was calculated in parallel.
        let anchor = tree.root();
        trace!(?height, ?anchor, "adding sprout tree");

        // Don't add a new tree unless it differs from the previous one or there's no previous tree.
        if height.is_min()
            || self
                .sprout_tree(
                    height
                        .previous()
                        .expect("Already checked for underflow.")
                        .into(),
                )
                .map_or(true, |prev_tree| prev_tree != tree)
        {
            assert_eq!(
                self.sprout_trees_by_height.insert(height, tree.clone()),
                None,
                "incorrect overwrite of sprout tree: trees must be reverted then inserted",
            );
        }

        // Store the root.
        assert_eq!(
            self.sprout_anchors_by_height.insert(height, anchor),
            None,
            "incorrect overwrite of sprout anchor: anchors must be reverted then inserted",
        );

        // Multiple inserts are expected here,
        // because the anchors only change if a block has shielded transactions.
        self.sprout_anchors.insert(anchor);
        self.sprout_trees_by_anchor.insert(anchor, tree);
    }

    /// Removes the Sprout tree and anchor indexes at `height`.
    ///
    /// `height` can be at two different [`RevertPosition`]s in the chain:
    ///
    /// - a tip block above a chain fork—only the tree and anchor at that height are removed, or
    /// - a root block—all trees and anchors at and below that height are removed, including
    ///   temporary finalized tip trees.
    ///
    ///   # Panics
    ///
    ///  - If the anchor being removed is not present.
    ///  - If there is no tree at `height`.
    fn remove_sprout_tree_and_anchor(&mut self, position: RevertPosition, height: Height) {
        let (removed_heights, highest_removed_tree) = if position == RevertPosition::Root {
            (
                // Remove all trees and anchors at or below the removed block.
                // This makes sure the temporary trees from finalized tip forks are removed.
                self.sprout_anchors_by_height
                    .keys()
                    .cloned()
                    .filter(|index_height| *index_height <= height)
                    .collect(),
                // Cache the highest (rightmost) tree before its removal.
                self.sprout_tree(height.into()),
            )
        } else {
            // Just remove the reverted tip trees and anchors.
            // We don't need to cache the highest (rightmost) tree.
            (vec![height], None)
        };

        for height in &removed_heights {
            let anchor = self
                .sprout_anchors_by_height
                .remove(height)
                .expect("Sprout anchor must be present if block was added to chain");

            self.sprout_trees_by_height.remove(height);

            trace!(?height, ?position, ?anchor, "removing sprout tree");

            // Multiple removals are expected here,
            // because the anchors only change if a block has shielded transactions.
            assert!(
                self.sprout_anchors.remove(&anchor),
                "Sprout anchor must be present if block was added to chain"
            );
            if !self.sprout_anchors.contains(&anchor) {
                self.sprout_trees_by_anchor.remove(&anchor);
            }
        }

        // # Invariant
        //
        // The height following after the removed heights in a non-empty non-finalized state must
        // always have its tree.
        //
        // The loop above can violate the invariant, and if `position` is [`RevertPosition::Root`],
        // it will always violate the invariant. We restore the invariant by storing the highest
        // (rightmost) removed tree just above `height` if there is no tree at that height.
        if !self.is_empty() && height < self.non_finalized_tip_height() {
            let next_height = height
                .next()
                .expect("Zebra should never reach the max height in normal operation.");

            self.sprout_trees_by_height
                .entry(next_height)
                .or_insert_with(|| {
                    highest_removed_tree.expect("There should be a cached removed tree.")
                });
        }
    }

    /// Returns the Sapling note commitment tree of the tip of this [`Chain`],
    /// including all finalized notes, and the non-finalized notes in this chain.
    ///
    /// If the chain is empty, instead returns the tree of the finalized tip,
    /// which was supplied in [`Chain::new()`]
    ///
    /// # Panics
    ///
    /// If this chain has no sapling trees. (This should be impossible.)
    pub fn sapling_note_commitment_tree_for_tip(&self) -> Arc<sapling::tree::NoteCommitmentTree> {
        self.sapling_trees_by_height
            .last_key_value()
            .expect("only called while sapling_trees_by_height is populated")
            .1
            .clone()
    }

    /// Returns the Sapling [`NoteCommitmentTree`](sapling::tree::NoteCommitmentTree) specified
    /// by a [`HashOrHeight`], if it exists in the non-finalized [`Chain`].
    pub fn sapling_tree(
        &self,
        hash_or_height: HashOrHeight,
    ) -> Option<Arc<sapling::tree::NoteCommitmentTree>> {
        let height =
            hash_or_height.height_or_else(|hash| self.height_by_hash.get(&hash).cloned())?;

        self.sapling_trees_by_height
            .range(..=height)
            .next_back()
            .map(|(_height, tree)| tree.clone())
    }

    /// Returns the Sapling [`NoteCommitmentSubtree`] that was completed at a block with
    /// [`HashOrHeight`], if it exists in the non-finalized [`Chain`].
    ///
    /// # Concurrency
    ///
    /// This method should not be used to get subtrees in concurrent code by height,
    /// because the same heights in different chain forks can have different subtrees.
    pub fn sapling_subtree(
        &self,
        hash_or_height: HashOrHeight,
    ) -> Option<NoteCommitmentSubtree<sapling::tree::Node>> {
        let height =
            hash_or_height.height_or_else(|hash| self.height_by_hash.get(&hash).cloned())?;

        self.sapling_subtrees
            .iter()
            .find(|(_index, subtree)| subtree.end_height == height)
            .map(|(index, subtree)| subtree.with_index(*index))
    }

    /// Returns a list of Sapling [`NoteCommitmentSubtree`]s in the provided range.
    ///
    /// Unlike the finalized state and `ReadRequest::SaplingSubtrees`, the returned subtrees
    /// can start after `start_index`. These subtrees are continuous up to the tip.
    ///
    /// There is no API for retrieving single subtrees by index, because it can accidentally be
    /// used to create an inconsistent list of subtrees after concurrent non-finalized and
    /// finalized updates.
    pub fn sapling_subtrees_in_range(
        &self,
        range: impl std::ops::RangeBounds<NoteCommitmentSubtreeIndex>,
    ) -> BTreeMap<NoteCommitmentSubtreeIndex, NoteCommitmentSubtreeData<sapling::tree::Node>> {
        self.sapling_subtrees
            .range(range)
            .map(|(index, subtree)| (*index, *subtree))
            .collect()
    }

    /// Returns the Sapling [`NoteCommitmentSubtree`] if it was completed at the tip height.
    pub fn sapling_subtree_for_tip(&self) -> Option<NoteCommitmentSubtree<sapling::tree::Node>> {
        if !self.is_empty() {
            let tip = self.non_finalized_tip_height();
            self.sapling_subtree(tip.into())
        } else {
            None
        }
    }

    /// Adds the Sapling `tree` to the tree and anchor indexes at `height`.
    ///
    /// `height` can be either:
    ///
    /// - the height of a new block that has just been added to the chain tip, or
    /// - the finalized tip height—the height of the parent of the first block of a new chain.
    ///
    /// Stores only the first tree in each series of identical trees.
    ///
    /// # Panics
    ///
    /// - If there's a tree already stored at `height`.
    /// - If there's an anchor already stored at `height`.
    fn add_sapling_tree_and_anchor(
        &mut self,
        height: Height,
        tree: Arc<sapling::tree::NoteCommitmentTree>,
    ) {
        let anchor = tree.root();
        trace!(?height, ?anchor, "adding sapling tree");

        // Don't add a new tree unless it differs from the previous one or there's no previous tree.
        if height.is_min()
            || self
                .sapling_tree(
                    height
                        .previous()
                        .expect("Already checked for underflow.")
                        .into(),
                )
                .map_or(true, |prev_tree| prev_tree != tree)
        {
            assert_eq!(
                self.sapling_trees_by_height.insert(height, tree),
                None,
                "incorrect overwrite of sapling tree: trees must be reverted then inserted",
            );
        }

        // Store the root.
        assert_eq!(
            self.sapling_anchors_by_height.insert(height, anchor),
            None,
            "incorrect overwrite of sapling anchor: anchors must be reverted then inserted",
        );

        // Multiple inserts are expected here,
        // because the anchors only change if a block has shielded transactions.
        self.sapling_anchors.insert(anchor);
    }

    /// Removes the Sapling tree and anchor indexes at `height`.
    ///
    /// `height` can be at two different [`RevertPosition`]s in the chain:
    ///
    /// - a tip block above a chain fork—only the tree and anchor at that height are removed, or
    /// - a root block—all trees and anchors at and below that height are removed, including
    ///   temporary finalized tip trees.
    ///
    ///   # Panics
    ///
    ///  - If the anchor being removed is not present.
    ///  - If there is no tree at `height`.
    fn remove_sapling_tree_and_anchor(&mut self, position: RevertPosition, height: Height) {
        let (removed_heights, highest_removed_tree) = if position == RevertPosition::Root {
            (
                // Remove all trees and anchors at or below the removed block.
                // This makes sure the temporary trees from finalized tip forks are removed.
                self.sapling_anchors_by_height
                    .keys()
                    .cloned()
                    .filter(|index_height| *index_height <= height)
                    .collect(),
                // Cache the highest (rightmost) tree before its removal.
                self.sapling_tree(height.into()),
            )
        } else {
            // Just remove the reverted tip trees and anchors.
            // We don't need to cache the highest (rightmost) tree.
            (vec![height], None)
        };

        for height in &removed_heights {
            let anchor = self
                .sapling_anchors_by_height
                .remove(height)
                .expect("Sapling anchor must be present if block was added to chain");

            self.sapling_trees_by_height.remove(height);

            trace!(?height, ?position, ?anchor, "removing sapling tree");

            // Multiple removals are expected here,
            // because the anchors only change if a block has shielded transactions.
            assert!(
                self.sapling_anchors.remove(&anchor),
                "Sapling anchor must be present if block was added to chain"
            );
        }

        // # Invariant
        //
        // The height following after the removed heights in a non-empty non-finalized state must
        // always have its tree.
        //
        // The loop above can violate the invariant, and if `position` is [`RevertPosition::Root`],
        // it will always violate the invariant. We restore the invariant by storing the highest
        // (rightmost) removed tree just above `height` if there is no tree at that height.
        if !self.is_empty() && height < self.non_finalized_tip_height() {
            let next_height = height
                .next()
                .expect("Zebra should never reach the max height in normal operation.");

            self.sapling_trees_by_height
                .entry(next_height)
                .or_insert_with(|| {
                    highest_removed_tree.expect("There should be a cached removed tree.")
                });
        }
    }

    /// Returns the Orchard note commitment tree of the tip of this [`Chain`],
    /// including all finalized notes, and the non-finalized notes in this chain.
    ///
    /// If the chain is empty, instead returns the tree of the finalized tip,
    /// which was supplied in [`Chain::new()`]
    ///
    /// # Panics
    ///
    /// If this chain has no orchard trees. (This should be impossible.)
    pub fn orchard_note_commitment_tree_for_tip(&self) -> Arc<orchard::tree::NoteCommitmentTree> {
        self.orchard_trees_by_height
            .last_key_value()
            .expect("only called while orchard_trees_by_height is populated")
            .1
            .clone()
    }

    /// Returns the Orchard
    /// [`NoteCommitmentTree`](orchard::tree::NoteCommitmentTree) specified by a
    /// [`HashOrHeight`], if it exists in the non-finalized [`Chain`].
    pub fn orchard_tree(
        &self,
        hash_or_height: HashOrHeight,
    ) -> Option<Arc<orchard::tree::NoteCommitmentTree>> {
        let height =
            hash_or_height.height_or_else(|hash| self.height_by_hash.get(&hash).cloned())?;

        self.orchard_trees_by_height
            .range(..=height)
            .next_back()
            .map(|(_height, tree)| tree.clone())
    }

    /// Returns the Orchard [`NoteCommitmentSubtree`] that was completed at a block with
    /// [`HashOrHeight`], if it exists in the non-finalized [`Chain`].
    ///
    /// # Concurrency
    ///
    /// This method should not be used to get subtrees in concurrent code by height,
    /// because the same heights in different chain forks can have different subtrees.
    pub fn orchard_subtree(
        &self,
        hash_or_height: HashOrHeight,
    ) -> Option<NoteCommitmentSubtree<orchard::tree::Node>> {
        let height =
            hash_or_height.height_or_else(|hash| self.height_by_hash.get(&hash).cloned())?;

        self.orchard_subtrees
            .iter()
            .find(|(_index, subtree)| subtree.end_height == height)
            .map(|(index, subtree)| subtree.with_index(*index))
    }

    /// Returns a list of Orchard [`NoteCommitmentSubtree`]s in the provided range.
    ///
    /// Unlike the finalized state and `ReadRequest::OrchardSubtrees`, the returned subtrees
    /// can start after `start_index`. These subtrees are continuous up to the tip.
    ///
    /// There is no API for retrieving single subtrees by index, because it can accidentally be
    /// used to create an inconsistent list of subtrees after concurrent non-finalized and
    /// finalized updates.
    pub fn orchard_subtrees_in_range(
        &self,
        range: impl std::ops::RangeBounds<NoteCommitmentSubtreeIndex>,
    ) -> BTreeMap<NoteCommitmentSubtreeIndex, NoteCommitmentSubtreeData<orchard::tree::Node>> {
        self.orchard_subtrees
            .range(range)
            .map(|(index, subtree)| (*index, *subtree))
            .collect()
    }

    /// Returns the Orchard [`NoteCommitmentSubtree`] if it was completed at the tip height.
    pub fn orchard_subtree_for_tip(&self) -> Option<NoteCommitmentSubtree<orchard::tree::Node>> {
        if !self.is_empty() {
            let tip = self.non_finalized_tip_height();
            self.orchard_subtree(tip.into())
        } else {
            None
        }
    }

    /// Adds the Orchard `tree` to the tree and anchor indexes at `height`.
    ///
    /// `height` can be either:
    ///
    /// - the height of a new block that has just been added to the chain tip, or
    /// - the finalized tip height—the height of the parent of the first block of a new chain.
    ///
    /// Stores only the first tree in each series of identical trees.
    ///
    /// # Panics
    ///
    /// - If there's a tree already stored at `height`.
    /// - If there's an anchor already stored at `height`.
    fn add_orchard_tree_and_anchor(
        &mut self,
        height: Height,
        tree: Arc<orchard::tree::NoteCommitmentTree>,
    ) {
        // Having updated all the note commitment trees and nullifier sets in
        // this block, the roots of the note commitment trees as of the last
        // transaction are the anchor treestates of this block.
        //
        // Use the previously cached root which was calculated in parallel.
        let anchor = tree.root();
        trace!(?height, ?anchor, "adding orchard tree");

        // Don't add a new tree unless it differs from the previous one or there's no previous tree.
        if height.is_min()
            || self
                .orchard_tree(
                    height
                        .previous()
                        .expect("Already checked for underflow.")
                        .into(),
                )
                .map_or(true, |prev_tree| prev_tree != tree)
        {
            assert_eq!(
                self.orchard_trees_by_height.insert(height, tree),
                None,
                "incorrect overwrite of orchard tree: trees must be reverted then inserted",
            );
        }

        // Store the root.
        assert_eq!(
            self.orchard_anchors_by_height.insert(height, anchor),
            None,
            "incorrect overwrite of orchard anchor: anchors must be reverted then inserted",
        );

        // Multiple inserts are expected here,
        // because the anchors only change if a block has shielded transactions.
        self.orchard_anchors.insert(anchor);
    }

    /// Removes the Orchard tree and anchor indexes at `height`.
    ///
    /// `height` can be at two different [`RevertPosition`]s in the chain:
    ///
    /// - a tip block above a chain fork—only the tree and anchor at that height are removed, or
    /// - a root block—all trees and anchors at and below that height are removed, including
    ///   temporary finalized tip trees.
    ///
    ///   # Panics
    ///
    ///  - If the anchor being removed is not present.
    ///  - If there is no tree at `height`.
    fn remove_orchard_tree_and_anchor(&mut self, position: RevertPosition, height: Height) {
        let (removed_heights, highest_removed_tree) = if position == RevertPosition::Root {
            (
                // Remove all trees and anchors at or below the removed block.
                // This makes sure the temporary trees from finalized tip forks are removed.
                self.orchard_anchors_by_height
                    .keys()
                    .cloned()
                    .filter(|index_height| *index_height <= height)
                    .collect(),
                // Cache the highest (rightmost) tree before its removal.
                self.orchard_tree(height.into()),
            )
        } else {
            // Just remove the reverted tip trees and anchors.
            // We don't need to cache the highest (rightmost) tree.
            (vec![height], None)
        };

        for height in &removed_heights {
            let anchor = self
                .orchard_anchors_by_height
                .remove(height)
                .expect("Orchard anchor must be present if block was added to chain");

            self.orchard_trees_by_height.remove(height);

            trace!(?height, ?position, ?anchor, "removing orchard tree");

            // Multiple removals are expected here,
            // because the anchors only change if a block has shielded transactions.
            assert!(
                self.orchard_anchors.remove(&anchor),
                "Orchard anchor must be present if block was added to chain"
            );
        }

        // # Invariant
        //
        // The height following after the removed heights in a non-empty non-finalized state must
        // always have its tree.
        //
        // The loop above can violate the invariant, and if `position` is [`RevertPosition::Root`],
        // it will always violate the invariant. We restore the invariant by storing the highest
        // (rightmost) removed tree just above `height` if there is no tree at that height.
        if !self.is_empty() && height < self.non_finalized_tip_height() {
            let next_height = height
                .next()
                .expect("Zebra should never reach the max height in normal operation.");

            self.orchard_trees_by_height
                .entry(next_height)
                .or_insert_with(|| {
                    highest_removed_tree.expect("There should be a cached removed tree.")
                });
        }
    }

    /// Returns the History tree of the tip of this [`Chain`],
    /// including all finalized blocks, and the non-finalized blocks below the chain tip.
    ///
    /// If the chain is empty, instead returns the tree of the finalized tip,
    /// which was supplied in [`Chain::new()`]
    ///
    /// # Panics
    ///
    /// If this chain has no history trees. (This should be impossible.)
    pub fn history_block_commitment_tree(&self) -> Arc<HistoryTree> {
        self.history_trees_by_height
            .last_key_value()
            .expect("only called while history_trees_by_height is populated")
            .1
            .clone()
    }

    /// Returns the [`HistoryTree`] specified by a [`HashOrHeight`], if it
    /// exists in the non-finalized [`Chain`].
    pub fn history_tree(&self, hash_or_height: HashOrHeight) -> Option<Arc<HistoryTree>> {
        let height =
            hash_or_height.height_or_else(|hash| self.height_by_hash.get(&hash).cloned())?;

        self.history_trees_by_height.get(&height).cloned()
    }

    /// Add the History `tree` to the history tree index at `height`.
    ///
    /// `height` can be either:
    /// - the height of a new block that has just been added to the chain tip, or
    /// - the finalized tip height: the height of the parent of the first block of a new chain.
    fn add_history_tree(&mut self, height: Height, tree: Arc<HistoryTree>) {
        // The history tree commits to all the blocks before this block.
        //
        // Use the previously cached root which was calculated in parallel.
        trace!(?height, "adding history tree");

        assert_eq!(
            self.history_trees_by_height.insert(height, tree),
            None,
            "incorrect overwrite of history tree: trees must be reverted then inserted",
        );
    }

    /// Remove the History tree index at `height`.
    ///
    /// `height` can be at two different [`RevertPosition`]s in the chain:
    /// - a tip block above a chain fork: only that height is removed, or
    /// - a root block: all trees below that height are removed,
    ///   including temporary finalized tip trees.
    fn remove_history_tree(&mut self, position: RevertPosition, height: Height) {
        trace!(?height, ?position, "removing history tree");

        if position == RevertPosition::Root {
            // Remove all trees at or below the reverted root block.
            // This makes sure the temporary trees from finalized tip forks are removed.
            self.history_trees_by_height
                .retain(|index_height, _tree| *index_height > height);
        } else {
            // Just remove the reverted tip tree.
            self.history_trees_by_height
                .remove(&height)
                .expect("History tree must be present if block was added to chain");
        }
    }

    fn treestate(&self, hash_or_height: HashOrHeight) -> Option<Treestate> {
        let sprout_tree = self.sprout_tree(hash_or_height)?;
        let sapling_tree = self.sapling_tree(hash_or_height)?;
        let orchard_tree = self.orchard_tree(hash_or_height)?;
        let history_tree = self.history_tree(hash_or_height)?;
        let sapling_subtree = self.sapling_subtree(hash_or_height);
        let orchard_subtree = self.orchard_subtree(hash_or_height);

        Some(Treestate::new(
            sprout_tree,
            sapling_tree,
            orchard_tree,
            sapling_subtree,
            orchard_subtree,
            history_tree,
        ))
    }

    /// Returns the block hash of the tip block.
    pub fn non_finalized_tip_hash(&self) -> block::Hash {
        self.blocks
            .values()
            .next_back()
            .expect("only called while blocks is populated")
            .hash
    }

    /// Returns the non-finalized root block hash and height.
    #[allow(dead_code)]
    pub fn non_finalized_root(&self) -> (block::Hash, block::Height) {
        (
            self.non_finalized_root_hash(),
            self.non_finalized_root_height(),
        )
    }

    /// Returns the block hash of the non-finalized root block.
    pub fn non_finalized_root_hash(&self) -> block::Hash {
        self.blocks
            .values()
            .next()
            .expect("only called while blocks is populated")
            .hash
    }

    /// Returns the block hash of the `n`th block from the non-finalized root.
    ///
    /// This is the block at `non_finalized_root_height() + n`.
    #[allow(dead_code)]
    pub fn non_finalized_nth_hash(&self, n: usize) -> Option<block::Hash> {
        self.blocks.values().nth(n).map(|block| block.hash)
    }

    /// Remove the highest height block of the non-finalized portion of a chain.
    fn pop_tip(&mut self) {
        let block_height = self.non_finalized_tip_height();

        let block = self
            .blocks
            .remove(&block_height)
            .expect("only called while blocks is populated");

        assert!(
            !self.blocks.is_empty(),
            "Non-finalized chains must have at least one block to be valid"
        );

        self.revert_chain_with(&block, RevertPosition::Tip);
    }

    /// Return the non-finalized tip height for this chain.
    ///
    /// # Panics
    ///
    /// Panics if called while the chain is empty,
    /// or while the chain is updating its internal state with the first block.
    pub fn non_finalized_tip_height(&self) -> block::Height {
        self.max_block_height()
            .expect("only called while blocks is populated")
    }

    /// Return the non-finalized tip height for this chain,
    /// or `None` if `self.blocks` is empty.
    fn max_block_height(&self) -> Option<block::Height> {
        self.blocks.keys().next_back().cloned()
    }

    /// Return the non-finalized tip block for this chain,
    /// or `None` if `self.blocks` is empty.
    pub fn tip_block(&self) -> Option<&ContextuallyVerifiedBlock> {
        self.blocks.values().next_back()
    }

    /// Returns true if the non-finalized part of this chain is empty.
    pub fn is_empty(&self) -> bool {
        self.blocks.is_empty()
    }

    /// Returns the non-finalized length of this chain.
    #[allow(dead_code)]
    pub fn len(&self) -> usize {
        self.blocks.len()
    }

    /// Returns the unspent transaction outputs (UTXOs) in this non-finalized chain.
    ///
    /// Callers should also check the finalized state for available UTXOs.
    /// If UTXOs remain unspent when a block is finalized, they are stored in the finalized state,
    /// and removed from the relevant chain(s).
    pub fn unspent_utxos(&self) -> HashMap<transparent::OutPoint, transparent::OrderedUtxo> {
        let mut unspent_utxos = self.created_utxos.clone();
        unspent_utxos.retain(|outpoint, _utxo| !self.spent_utxos.contains(outpoint));

        unspent_utxos
    }

    /// Returns the [`transparent::Utxo`] pointed to by the given
    /// [`transparent::OutPoint`] if it was created by this chain.
    ///
    /// UTXOs are returned regardless of whether they have been spent.
    pub fn created_utxo(&self, outpoint: &transparent::OutPoint) -> Option<transparent::Utxo> {
        if let Some(utxo) = self.created_utxos.get(outpoint) {
            return Some(utxo.utxo.clone());
        }

        None
    }

    // Address index queries

    /// Returns the transparent transfers for `addresses` in this non-finalized chain.
    ///
    /// If none of the addresses have an address index, returns an empty iterator.
    ///
    /// # Correctness
    ///
    /// Callers should apply the returned indexes to the corresponding finalized state indexes.
    ///
    /// The combined result will only be correct if the chains match.
    /// The exact type of match varies by query.
    pub fn partial_transparent_indexes<'a>(
        &'a self,
        addresses: &'a HashSet<transparent::Address>,
    ) -> impl Iterator<Item = &TransparentTransfers> {
        addresses
            .iter()
            .flat_map(|address| self.partial_transparent_transfers.get(address))
    }

    /// Returns the transparent balance change for `addresses` in this non-finalized chain.
    ///
    /// If the balance doesn't change for any of the addresses, returns zero.
    ///
    /// # Correctness
    ///
    /// Callers should apply this balance change to the finalized state balance for `addresses`.
    ///
    /// The total balance will only be correct if this partial chain matches the finalized state.
    /// Specifically, the root of this partial chain must be a child block of the finalized tip.
    pub fn partial_transparent_balance_change(
        &self,
        addresses: &HashSet<transparent::Address>,
    ) -> Amount<NegativeAllowed> {
        let balance_change: Result<Amount<NegativeAllowed>, _> = self
            .partial_transparent_indexes(addresses)
            .map(|transfers| transfers.balance())
            .sum();

        balance_change.expect(
            "unexpected amount overflow: value balances are valid, so partial sum should be valid",
        )
    }

    /// Returns the transparent UTXO changes for `addresses` in this non-finalized chain.
    ///
    /// If the UTXOs don't change for any of the addresses, returns empty lists.
    ///
    /// # Correctness
    ///
    /// Callers should apply these non-finalized UTXO changes to the finalized state UTXOs.
    ///
    /// The UTXOs will only be correct if the non-finalized chain matches or overlaps with
    /// the finalized state.
    ///
    /// Specifically, a block in the partial chain must be a child block of the finalized tip.
    /// (But the child block does not have to be the partial chain root.)
    pub fn partial_transparent_utxo_changes(
        &self,
        addresses: &HashSet<transparent::Address>,
    ) -> (
        BTreeMap<OutputLocation, transparent::Output>,
        BTreeSet<OutputLocation>,
    ) {
        let created_utxos = self
            .partial_transparent_indexes(addresses)
            .flat_map(|transfers| transfers.created_utxos())
            .map(|(out_loc, output)| (*out_loc, output.clone()))
            .collect();

        let spent_utxos = self
            .partial_transparent_indexes(addresses)
            .flat_map(|transfers| transfers.spent_utxos())
            .cloned()
            .collect();

        (created_utxos, spent_utxos)
    }

    /// Returns the [`transaction::Hash`]es used by `addresses` to receive or spend funds,
    /// in the non-finalized chain, filtered using the `query_height_range`.
    ///
    /// If none of the addresses receive or spend funds in this partial chain, returns an empty list.
    ///
    /// # Correctness
    ///
    /// Callers should combine these non-finalized transactions with the finalized state transactions.
    ///
    /// The transaction IDs will only be correct if the non-finalized chain matches or overlaps with
    /// the finalized state.
    ///
    /// Specifically, a block in the partial chain must be a child block of the finalized tip.
    /// (But the child block does not have to be the partial chain root.)
    ///
    /// This condition does not apply if there is only one address.
    /// Since address transactions are only appended by blocks,
    /// and the finalized state query reads them in order,
    /// it is impossible to get inconsistent transactions for a single address.
    pub fn partial_transparent_tx_ids(
        &self,
        addresses: &HashSet<transparent::Address>,
        query_height_range: RangeInclusive<Height>,
    ) -> BTreeMap<TransactionLocation, transaction::Hash> {
        self.partial_transparent_indexes(addresses)
            .flat_map(|transfers| {
                transfers.tx_ids(&self.tx_loc_by_hash, query_height_range.clone())
            })
            .collect()
    }

    /// Update the chain tip with the `contextually_valid` block,
    /// running note commitment tree updates in parallel with other updates.
    ///
    /// Used to implement `update_chain_tip_with::<ContextuallyVerifiedBlock>`.
    #[instrument(skip(self, contextually_valid), fields(block = %contextually_valid.block))]
    #[allow(clippy::unwrap_in_result)]
    fn update_chain_tip_with_block_parallel(
        &mut self,
        contextually_valid: &ContextuallyVerifiedBlock,
    ) -> Result<(), ValidateContextError> {
        let height = contextually_valid.height;

        // Prepare data for parallel execution
        let mut nct = NoteCommitmentTrees {
            sprout: self.sprout_note_commitment_tree_for_tip(),
            sapling: self.sapling_note_commitment_tree_for_tip(),
            sapling_subtree: self.sapling_subtree_for_tip(),
            orchard: self.orchard_note_commitment_tree_for_tip(),
            orchard_subtree: self.orchard_subtree_for_tip(),
        };

        let mut tree_result = None;
        let mut partial_result = None;

        // Run 4 tasks in parallel:
        // - sprout, sapling, and orchard tree updates and root calculations
        // - the rest of the Chain updates
        rayon::in_place_scope_fifo(|scope| {
            // Spawns a separate rayon task for each note commitment tree
            tree_result = Some(nct.update_trees_parallel(&contextually_valid.block.clone()));

            scope.spawn_fifo(|_scope| {
                partial_result =
                    Some(self.update_chain_tip_with_block_except_trees(contextually_valid));
            });
        });

        tree_result.expect("scope has already finished")?;
        partial_result.expect("scope has already finished")?;

        // Update the note commitment trees in the chain.
        self.add_sprout_tree_and_anchor(height, nct.sprout);
        self.add_sapling_tree_and_anchor(height, nct.sapling);
        self.add_orchard_tree_and_anchor(height, nct.orchard);

        if let Some(subtree) = nct.sapling_subtree {
            self.sapling_subtrees
                .insert(subtree.index, subtree.into_data());
        }
        if let Some(subtree) = nct.orchard_subtree {
            self.orchard_subtrees
                .insert(subtree.index, subtree.into_data());
        }

        let sapling_root = self.sapling_note_commitment_tree_for_tip().root();
        let orchard_root = self.orchard_note_commitment_tree_for_tip().root();

        // TODO: update the history trees in a rayon thread, if they show up in CPU profiles
        let mut history_tree = self.history_block_commitment_tree();
        let history_tree_mut = Arc::make_mut(&mut history_tree);
        history_tree_mut
            .push(
                &self.network,
                contextually_valid.block.clone(),
                &sapling_root,
                &orchard_root,
            )
            .map_err(Arc::new)?;

        self.add_history_tree(height, history_tree);

        Ok(())
    }

    /// Update the chain tip with the `contextually_valid` block,
    /// except for the note commitment and history tree updates.
    ///
    /// Used to implement `update_chain_tip_with::<ContextuallyVerifiedBlock>`.
    #[instrument(skip(self, contextually_valid), fields(block = %contextually_valid.block))]
    #[allow(clippy::unwrap_in_result)]
    fn update_chain_tip_with_block_except_trees(
        &mut self,
        contextually_valid: &ContextuallyVerifiedBlock,
    ) -> Result<(), ValidateContextError> {
        let (
            block,
            hash,
            height,
            new_outputs,
            spent_outputs,
            transaction_hashes,
            chain_value_pool_change,
        ) = (
            contextually_valid.block.as_ref(),
            contextually_valid.hash,
            contextually_valid.height,
            &contextually_valid.new_outputs,
            &contextually_valid.spent_outputs,
            &contextually_valid.transaction_hashes,
            &contextually_valid.chain_value_pool_change,
        );

        // add hash to height_by_hash
        let prior_height = self.height_by_hash.insert(hash, height);
        assert!(
            prior_height.is_none(),
            "block heights must be unique within a single chain"
        );

        // add work to partial cumulative work
        let block_work = block
            .header
            .difficulty_threshold
            .to_work()
            .expect("work has already been validated");
        self.partial_cumulative_work += block_work;

        // for each transaction in block
        for (transaction_index, (transaction, transaction_hash)) in block
            .transactions
            .iter()
            .zip(transaction_hashes.iter().cloned())
            .enumerate()
        {
            let (
                inputs,
                outputs,
                joinsplit_data,
                sapling_shielded_data_per_spend_anchor,
                sapling_shielded_data_shared_anchor,
                orchard_shielded_data,
            ) = match transaction.deref() {
                V4 {
                    inputs,
                    outputs,
                    joinsplit_data,
                    sapling_shielded_data,
                    ..
                } => (inputs, outputs, joinsplit_data, sapling_shielded_data, &None, &None),
                V5 {
                    inputs,
                    outputs,
                    sapling_shielded_data,
                    orchard_shielded_data,
                    ..
                } => (
                    inputs,
                    outputs,
                    &None,
                    &None,
                    sapling_shielded_data,
                    orchard_shielded_data,
                ),
                V1 { .. } | V2 { .. } | V3 { .. } => unreachable!(
                    "older transaction versions only exist in finalized blocks, because of the mandatory canopy checkpoint",
                ),
            };

            // add key `transaction.hash` and value `(height, tx_index)` to `tx_loc_by_hash`
            let transaction_location = TransactionLocation::from_usize(height, transaction_index);
            let prior_pair = self
                .tx_loc_by_hash
                .insert(transaction_hash, transaction_location);
            assert_eq!(
                prior_pair, None,
                "transactions must be unique within a single chain"
            );

            // add the utxos this produced
            self.update_chain_tip_with(&(outputs, &transaction_hash, new_outputs))?;
            // delete the utxos this consumed
            self.update_chain_tip_with(&(inputs, &transaction_hash, spent_outputs))?;

            // add the shielded data
            self.update_chain_tip_with(joinsplit_data)?;
            self.update_chain_tip_with(sapling_shielded_data_per_spend_anchor)?;
            self.update_chain_tip_with(sapling_shielded_data_shared_anchor)?;
            self.update_chain_tip_with(orchard_shielded_data)?;
        }

        // update the chain value pool balances
        self.update_chain_tip_with(chain_value_pool_change)?;

        Ok(())
    }
}

impl Deref for Chain {
    type Target = ChainInner;

    fn deref(&self) -> &Self::Target {
        &self.inner
    }
}

impl DerefMut for Chain {
    fn deref_mut(&mut self) -> &mut Self::Target {
        &mut self.inner
    }
}

/// The revert position being performed on a chain.
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash)]
enum RevertPosition {
    /// The chain root is being reverted via [`Chain::pop_root`], when a block
    /// is finalized.
    Root,

    /// The chain tip is being reverted via [`Chain::pop_tip`],
    /// when a chain is forked.
    Tip,
}

/// Helper trait to organize inverse operations done on the [`Chain`] type.
///
/// Used to overload update and revert methods, based on the type of the argument,
/// and the position of the removed block in the chain.
///
/// This trait was motivated by the length of the `push`, [`Chain::pop_root`],
/// and [`Chain::pop_tip`] functions, and fear that it would be easy to
/// introduce bugs when updating them, unless the code was reorganized to keep
/// related operations adjacent to each other.
trait UpdateWith<T> {
    /// When `T` is added to the chain tip,
    /// update [`Chain`] cumulative data members to add data that are derived from `T`.
    fn update_chain_tip_with(&mut self, _: &T) -> Result<(), ValidateContextError>;

    /// When `T` is removed from `position` in the chain,
    /// revert [`Chain`] cumulative data members to remove data that are derived from `T`.
    fn revert_chain_with(&mut self, _: &T, position: RevertPosition);
}

impl UpdateWith<ContextuallyVerifiedBlock> for Chain {
    #[instrument(skip(self, contextually_valid), fields(block = %contextually_valid.block))]
    #[allow(clippy::unwrap_in_result)]
    fn update_chain_tip_with(
        &mut self,
        contextually_valid: &ContextuallyVerifiedBlock,
    ) -> Result<(), ValidateContextError> {
        self.update_chain_tip_with_block_parallel(contextually_valid)
    }

    #[instrument(skip(self, contextually_valid), fields(block = %contextually_valid.block))]
    fn revert_chain_with(
        &mut self,
        contextually_valid: &ContextuallyVerifiedBlock,
        position: RevertPosition,
    ) {
        let (
            block,
            hash,
            height,
            new_outputs,
            spent_outputs,
            transaction_hashes,
            chain_value_pool_change,
        ) = (
            contextually_valid.block.as_ref(),
            contextually_valid.hash,
            contextually_valid.height,
            &contextually_valid.new_outputs,
            &contextually_valid.spent_outputs,
            &contextually_valid.transaction_hashes,
            &contextually_valid.chain_value_pool_change,
        );

        // remove the blocks hash from `height_by_hash`
        assert!(
            self.height_by_hash.remove(&hash).is_some(),
            "hash must be present if block was added to chain"
        );

        // TODO: move this to a Work or block header UpdateWith.revert...()?
        // remove work from partial_cumulative_work
        let block_work = block
            .header
            .difficulty_threshold
            .to_work()
            .expect("work has already been validated");
        self.partial_cumulative_work -= block_work;

        // for each transaction in block
        for (transaction, transaction_hash) in
            block.transactions.iter().zip(transaction_hashes.iter())
        {
            let (
                inputs,
                outputs,
                joinsplit_data,
                sapling_shielded_data_per_spend_anchor,
                sapling_shielded_data_shared_anchor,
                orchard_shielded_data,
            ) = match transaction.deref() {
                V4 {
                    inputs,
                    outputs,
                    joinsplit_data,
                    sapling_shielded_data,
                    ..
                } => (inputs, outputs, joinsplit_data, sapling_shielded_data, &None, &None),
                V5 {
                    inputs,
                    outputs,
                    sapling_shielded_data,
                    orchard_shielded_data,
                    ..
                } => (
                    inputs,
                    outputs,
                    &None,
                    &None,
                    sapling_shielded_data,
                    orchard_shielded_data,
                ),
                V1 { .. } | V2 { .. } | V3 { .. } => unreachable!(
                    "older transaction versions only exist in finalized blocks, because of the mandatory canopy checkpoint",
                ),
            };

            // remove the utxos this produced
            self.revert_chain_with(&(outputs, transaction_hash, new_outputs), position);
            // reset the utxos this consumed
            self.revert_chain_with(&(inputs, transaction_hash, spent_outputs), position);

            // TODO: move this to the history tree UpdateWith.revert...()?
            // remove `transaction.hash` from `tx_loc_by_hash`
            assert!(
                self.tx_loc_by_hash.remove(transaction_hash).is_some(),
                "transactions must be present if block was added to chain"
            );

            // remove the shielded data
            self.revert_chain_with(joinsplit_data, position);
            self.revert_chain_with(sapling_shielded_data_per_spend_anchor, position);
            self.revert_chain_with(sapling_shielded_data_shared_anchor, position);
            self.revert_chain_with(orchard_shielded_data, position);
        }

        // TODO: move these to the shielded UpdateWith.revert...()?
        self.remove_sprout_tree_and_anchor(position, height);
        self.remove_sapling_tree_and_anchor(position, height);
        self.remove_orchard_tree_and_anchor(position, height);

        // TODO: move this to the history tree UpdateWith.revert...()?
        self.remove_history_tree(position, height);

        // revert the chain value pool balances, if needed
        self.revert_chain_with(chain_value_pool_change, position);
    }
}

// Created UTXOs
//
// TODO: replace arguments with a struct
impl
    UpdateWith<(
        // The outputs from a transaction in this block
        &Vec<transparent::Output>,
        // The hash of the transaction that the outputs are from
        &transaction::Hash,
        // The UTXOs for all outputs created by this transaction (or block)
        &HashMap<transparent::OutPoint, transparent::OrderedUtxo>,
    )> for Chain
{
    #[allow(clippy::unwrap_in_result)]
    fn update_chain_tip_with(
        &mut self,
        &(created_outputs, creating_tx_hash, block_created_outputs): &(
            &Vec<transparent::Output>,
            &transaction::Hash,
            &HashMap<transparent::OutPoint, transparent::OrderedUtxo>,
        ),
    ) -> Result<(), ValidateContextError> {
        for output_index in 0..created_outputs.len() {
            let outpoint = transparent::OutPoint {
                hash: *creating_tx_hash,
                index: output_index.try_into().expect("valid indexes fit in u32"),
            };
            let created_utxo = block_created_outputs
                .get(&outpoint)
                .expect("new_outputs contains all created UTXOs");

            // Update the chain's created UTXOs
            let previous_entry = self.created_utxos.insert(outpoint, created_utxo.clone());
            assert_eq!(
                previous_entry, None,
                "unexpected created output: duplicate update or duplicate UTXO",
            );

            // Update the address index with this UTXO
            if let Some(receiving_address) = created_utxo.utxo.output.address(&self.network) {
                let address_transfers = self
                    .partial_transparent_transfers
                    .entry(receiving_address)
                    .or_default();

                address_transfers.update_chain_tip_with(&(&outpoint, created_utxo))?;
            }
        }

        Ok(())
    }

    fn revert_chain_with(
        &mut self,
        &(created_outputs, creating_tx_hash, block_created_outputs): &(
            &Vec<transparent::Output>,
            &transaction::Hash,
            &HashMap<transparent::OutPoint, transparent::OrderedUtxo>,
        ),
        position: RevertPosition,
    ) {
        for output_index in 0..created_outputs.len() {
            let outpoint = transparent::OutPoint {
                hash: *creating_tx_hash,
                index: output_index.try_into().expect("valid indexes fit in u32"),
            };
            let created_utxo = block_created_outputs
                .get(&outpoint)
                .expect("new_outputs contains all created UTXOs");

            // Revert the chain's created UTXOs
            let removed_entry = self.created_utxos.remove(&outpoint);
            assert!(
                removed_entry.is_some(),
                "unexpected revert of created output: duplicate revert or duplicate UTXO",
            );

            // Revert the address index for this UTXO
            if let Some(receiving_address) = created_utxo.utxo.output.address(&self.network) {
                let address_transfers = self
                    .partial_transparent_transfers
                    .get_mut(&receiving_address)
                    .expect("block has previously been applied to the chain");

                address_transfers.revert_chain_with(&(&outpoint, created_utxo), position);

                // Remove this transfer if it is now empty
                if address_transfers.is_empty() {
                    self.partial_transparent_transfers
                        .remove(&receiving_address);
                }
            }
        }
    }
}

// Transparent inputs
//
// TODO: replace arguments with a struct
impl
    UpdateWith<(
        // The inputs from a transaction in this block
        &Vec<transparent::Input>,
        // The hash of the transaction that the inputs are from
        // (not the transaction the spent output was created by)
        &transaction::Hash,
        // The outputs for all inputs spent in this transaction (or block)
        &HashMap<transparent::OutPoint, transparent::OrderedUtxo>,
    )> for Chain
{
    fn update_chain_tip_with(
        &mut self,
        &(spending_inputs, spending_tx_hash, spent_outputs): &(
            &Vec<transparent::Input>,
            &transaction::Hash,
            &HashMap<transparent::OutPoint, transparent::OrderedUtxo>,
        ),
    ) -> Result<(), ValidateContextError> {
        for spending_input in spending_inputs.iter() {
            let spent_outpoint = if let Some(spent_outpoint) = spending_input.outpoint() {
                spent_outpoint
            } else {
                continue;
            };

            // Index the spent outpoint in the chain
            let first_spend = self.spent_utxos.insert(spent_outpoint);
            assert!(
                first_spend,
                "unexpected duplicate spent output: should be checked earlier"
            );

            // TODO: fix tests to supply correct spent outputs, then turn this into an expect()
            let spent_output = if let Some(spent_output) = spent_outputs.get(&spent_outpoint) {
                spent_output
            } else if !cfg!(test) {
                panic!("unexpected missing spent output: all spent outputs must be indexed");
            } else {
                continue;
            };

            // Index the spent output for the address
            if let Some(spending_address) = spent_output.utxo.output.address(&self.network) {
                let address_transfers = self
                    .partial_transparent_transfers
                    .entry(spending_address)
                    .or_default();

                address_transfers.update_chain_tip_with(&(
                    spending_input,
                    spending_tx_hash,
                    spent_output,
                ))?;
            }
        }

        Ok(())
    }

    fn revert_chain_with(
        &mut self,
        &(spending_inputs, spending_tx_hash, spent_outputs): &(
            &Vec<transparent::Input>,
            &transaction::Hash,
            &HashMap<transparent::OutPoint, transparent::OrderedUtxo>,
        ),
        position: RevertPosition,
    ) {
        for spending_input in spending_inputs.iter() {
            let spent_outpoint = if let Some(spent_outpoint) = spending_input.outpoint() {
                spent_outpoint
            } else {
                continue;
            };

            // Revert the spent outpoint in the chain
            let spent_outpoint_was_removed = self.spent_utxos.remove(&spent_outpoint);
            assert!(
                spent_outpoint_was_removed,
                "spent_utxos must be present if block was added to chain"
            );

            // TODO: fix tests to supply correct spent outputs, then turn this into an expect()
            let spent_output = if let Some(spent_output) = spent_outputs.get(&spent_outpoint) {
                spent_output
            } else if !cfg!(test) {
                panic!(
                    "unexpected missing reverted spent output: all spent outputs must be indexed"
                );
            } else {
                continue;
            };

            // Revert the spent output for the address
            if let Some(receiving_address) = spent_output.utxo.output.address(&self.network) {
                let address_transfers = self
                    .partial_transparent_transfers
                    .get_mut(&receiving_address)
                    .expect("block has previously been applied to the chain");

                address_transfers
                    .revert_chain_with(&(spending_input, spending_tx_hash, spent_output), position);

                // Remove this transfer if it is now empty
                if address_transfers.is_empty() {
                    self.partial_transparent_transfers
                        .remove(&receiving_address);
                }
            }
        }
    }
}

impl UpdateWith<Option<transaction::JoinSplitData<Groth16Proof>>> for Chain {
    #[instrument(skip(self, joinsplit_data))]
    fn update_chain_tip_with(
        &mut self,
        joinsplit_data: &Option<transaction::JoinSplitData<Groth16Proof>>,
    ) -> Result<(), ValidateContextError> {
        if let Some(joinsplit_data) = joinsplit_data {
            // We do note commitment tree updates in parallel rayon threads.

            check::nullifier::add_to_non_finalized_chain_unique(
                &mut self.sprout_nullifiers,
                joinsplit_data.nullifiers(),
            )?;
        }
        Ok(())
    }

    /// # Panics
    ///
    /// Panics if any nullifier is missing from the chain when we try to remove it.
    ///
    /// See [`check::nullifier::remove_from_non_finalized_chain`] for details.
    #[instrument(skip(self, joinsplit_data))]
    fn revert_chain_with(
        &mut self,
        joinsplit_data: &Option<transaction::JoinSplitData<Groth16Proof>>,
        _position: RevertPosition,
    ) {
        if let Some(joinsplit_data) = joinsplit_data {
            // Note commitments are removed from the Chain during a fork,
            // by removing trees above the fork height from the note commitment index.
            // This happens when reverting the block itself.

            check::nullifier::remove_from_non_finalized_chain(
                &mut self.sprout_nullifiers,
                joinsplit_data.nullifiers(),
            );
        }
    }
}

impl<AnchorV> UpdateWith<Option<sapling::ShieldedData<AnchorV>>> for Chain
where
    AnchorV: sapling::AnchorVariant + Clone,
{
    #[instrument(skip(self, sapling_shielded_data))]
    fn update_chain_tip_with(
        &mut self,
        sapling_shielded_data: &Option<sapling::ShieldedData<AnchorV>>,
    ) -> Result<(), ValidateContextError> {
        if let Some(sapling_shielded_data) = sapling_shielded_data {
            // We do note commitment tree updates in parallel rayon threads.

            check::nullifier::add_to_non_finalized_chain_unique(
                &mut self.sapling_nullifiers,
                sapling_shielded_data.nullifiers(),
            )?;
        }
        Ok(())
    }

    /// # Panics
    ///
    /// Panics if any nullifier is missing from the chain when we try to remove it.
    ///
    /// See [`check::nullifier::remove_from_non_finalized_chain`] for details.
    #[instrument(skip(self, sapling_shielded_data))]
    fn revert_chain_with(
        &mut self,
        sapling_shielded_data: &Option<sapling::ShieldedData<AnchorV>>,
        _position: RevertPosition,
    ) {
        if let Some(sapling_shielded_data) = sapling_shielded_data {
            // Note commitments are removed from the Chain during a fork,
            // by removing trees above the fork height from the note commitment index.
            // This happens when reverting the block itself.

            check::nullifier::remove_from_non_finalized_chain(
                &mut self.sapling_nullifiers,
                sapling_shielded_data.nullifiers(),
            );
        }
    }
}

impl UpdateWith<Option<orchard::ShieldedData>> for Chain {
    #[instrument(skip(self, orchard_shielded_data))]
    fn update_chain_tip_with(
        &mut self,
        orchard_shielded_data: &Option<orchard::ShieldedData>,
    ) -> Result<(), ValidateContextError> {
        if let Some(orchard_shielded_data) = orchard_shielded_data {
            // We do note commitment tree updates in parallel rayon threads.

            check::nullifier::add_to_non_finalized_chain_unique(
                &mut self.orchard_nullifiers,
                orchard_shielded_data.nullifiers(),
            )?;
        }
        Ok(())
    }

    /// # Panics
    ///
    /// Panics if any nullifier is missing from the chain when we try to remove it.
    ///
    /// See [`check::nullifier::remove_from_non_finalized_chain`] for details.
    #[instrument(skip(self, orchard_shielded_data))]
    fn revert_chain_with(
        &mut self,
        orchard_shielded_data: &Option<orchard::ShieldedData>,
        _position: RevertPosition,
    ) {
        if let Some(orchard_shielded_data) = orchard_shielded_data {
            // Note commitments are removed from the Chain during a fork,
            // by removing trees above the fork height from the note commitment index.
            // This happens when reverting the block itself.

            check::nullifier::remove_from_non_finalized_chain(
                &mut self.orchard_nullifiers,
                orchard_shielded_data.nullifiers(),
            );
        }
    }
}

impl UpdateWith<ValueBalance<NegativeAllowed>> for Chain {
    fn update_chain_tip_with(
        &mut self,
        block_value_pool_change: &ValueBalance<NegativeAllowed>,
    ) -> Result<(), ValidateContextError> {
        match self
            .chain_value_pools
            .add_chain_value_pool_change(*block_value_pool_change)
        {
            Ok(chain_value_pools) => self.chain_value_pools = chain_value_pools,
            Err(value_balance_error) => Err(ValidateContextError::AddValuePool {
                value_balance_error,
                chain_value_pools: self.chain_value_pools,
                block_value_pool_change: *block_value_pool_change,
                // assume that the current block is added to `blocks` after `update_chain_tip_with`
                height: self.max_block_height().and_then(|height| height + 1),
            })?,
        };

        Ok(())
    }

    /// Revert the chain state using a block chain value pool change.
    ///
    /// When forking from the tip, subtract the block's chain value pool change.
    ///
    /// When finalizing the root, leave the chain value pool balances unchanged.
    /// [`ChainInner::chain_value_pools`] tracks the chain value pools for all finalized blocks, and
    /// the non-finalized blocks in this chain. So finalizing the root doesn't change the set of
    /// blocks it tracks.
    ///
    /// # Panics
    ///
    /// Panics if the chain pool value balance is invalid after we subtract the block value pool
    /// change.
    fn revert_chain_with(
        &mut self,
        block_value_pool_change: &ValueBalance<NegativeAllowed>,
        position: RevertPosition,
    ) {
        use std::ops::Neg;

        if position == RevertPosition::Tip {
            self.chain_value_pools = self
                .chain_value_pools
                .add_chain_value_pool_change(block_value_pool_change.neg())
                .expect("reverting the tip will leave the pools in a previously valid state");
        }
    }
}

impl Ord for Chain {
    /// Chain order for the [`NonFinalizedState`][1]'s `chain_set`.
    ///
    /// Chains with higher cumulative Proof of Work are [`Ordering::Greater`],
    /// breaking ties using the tip block hash.
    ///
    /// Despite the consensus rules, Zebra uses the tip block hash as a
    /// tie-breaker. Zebra blocks are downloaded in parallel, so download
    /// timestamps may not be unique. (And Zebra currently doesn't track
    /// download times, because [`Block`](block::Block)s are immutable.)
    ///
    /// This departure from the consensus rules may delay network convergence,
    /// for as long as the greater hash belongs to the later mined block.
    /// But Zebra nodes should converge as soon as the tied work is broken.
    ///
    /// "At a given point in time, each full validator is aware of a set of candidate blocks.
    /// These form a tree rooted at the genesis block, where each node in the tree
    /// refers to its parent via the hashPrevBlock block header field.
    ///
    /// A path from the root toward the leaves of the tree consisting of a sequence
    /// of one or more valid blocks consistent with consensus rules,
    /// is called a valid block chain.
    ///
    /// In order to choose the best valid block chain in its view of the overall block tree,
    /// a node sums the work ... of all blocks in each valid block chain,
    /// and considers the valid block chain with greatest total work to be best.
    ///
    /// To break ties between leaf blocks, a node will prefer the block that it received first.
    ///
    /// The consensus protocol is designed to ensure that for any given block height,
    /// the vast majority of nodes should eventually agree on their best valid block chain
    /// up to that height."
    ///
    /// <https://zips.z.cash/protocol/protocol.pdf#blockchain>
    ///
    /// # Correctness
    ///
    /// `Chain::cmp` is used in a `BTreeSet`, so the fields accessed by `cmp` must not have
    /// interior mutability.
    ///
    /// # Panics
    ///
    /// If two chains compare equal.
    ///
    /// This panic enforces the [`NonFinalizedState::chain_set`][2] unique chain invariant.
    ///
    /// If the chain set contains duplicate chains, the non-finalized state might
    /// handle new blocks or block finalization incorrectly.
    ///
    /// [1]: super::NonFinalizedState
    /// [2]: super::NonFinalizedState::chain_set
    fn cmp(&self, other: &Self) -> Ordering {
        if self.partial_cumulative_work != other.partial_cumulative_work {
            self.partial_cumulative_work
                .cmp(&other.partial_cumulative_work)
        } else {
            let self_hash = self
                .blocks
                .values()
                .last()
                .expect("always at least 1 element")
                .hash;

            let other_hash = other
                .blocks
                .values()
                .last()
                .expect("always at least 1 element")
                .hash;

            // This comparison is a tie-breaker within the local node, so it does not need to
            // be consistent with the ordering on `ExpandedDifficulty` and `block::Hash`.
            match self_hash.0.cmp(&other_hash.0) {
                Ordering::Equal => unreachable!("Chain tip block hashes are always unique"),
                ordering => ordering,
            }
        }
    }
}

impl PartialOrd for Chain {
    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
        Some(self.cmp(other))
    }
}

impl PartialEq for Chain {
    /// Chain equality for [`NonFinalizedState::chain_set`][1], using proof of
    /// work, then the tip block hash as a tie-breaker.
    ///
    /// # Panics
    ///
    /// If two chains compare equal.
    ///
    /// See [`Chain::cmp`] for details.
    ///
    /// [1]: super::NonFinalizedState::chain_set
    fn eq(&self, other: &Self) -> bool {
        self.partial_cmp(other) == Some(Ordering::Equal)
    }
}

impl Eq for Chain {}

#[cfg(test)]
impl Chain {
    /// Inserts the supplied Sapling note commitment subtree into the chain.
    pub(crate) fn insert_sapling_subtree(
        &mut self,
        subtree: NoteCommitmentSubtree<sapling::tree::Node>,
    ) {
        self.inner
            .sapling_subtrees
            .insert(subtree.index, subtree.into_data());
    }

    /// Inserts the supplied Orchard note commitment subtree into the chain.
    pub(crate) fn insert_orchard_subtree(
        &mut self,
        subtree: NoteCommitmentSubtree<orchard::tree::Node>,
    ) {
        self.inner
            .orchard_subtrees
            .insert(subtree.index, subtree.into_data());
    }
}