Skip to content

Commit f031138

Browse files
authored
Add skipDuplicates parameter on @entity directive (#6458)
* graph: parse skipDuplicates from @entity directive Add kw::SKIP_DUPLICATES constant, skip_duplicates bool field to ObjectType, and parsing logic in ObjectType::new() defaulting to false when absent. * graph: validate skipDuplicates directive argument Added SkipDuplicatesRequiresImmutable error variant, bool_arg validation for skipDuplicates in validate_entity_directives(), and three test functions covering non-boolean value, mutable entity, and timeseries+skipDuplicates. * graph: expose skip_duplicates on EntityType and InputSchema APIs Add TypeInfo::skip_duplicates(), InputSchema::skip_duplicates(), and EntityType::skip_duplicates() following the is_immutable() three-layer delegation pattern. Object types return immutable && skip_duplicates; Interface and Aggregation types return false. * graph, store: propagate skip_duplicates to RowGroup Add skip_duplicates: bool field to RowGroup struct alongside immutable, update RowGroup::new() to accept the parameter, and wire it from entity_type.skip_duplicates() in RowGroups::group_entry(). All other call sites (RowGroupForPerfTest, test helpers, example) pass false. * graph, store: lenient write-batch enforcement for skip_duplicates Modify RowGroup::append_row() so that when immutable=true and skip_duplicates=true, cross-block duplicate inserts and Overwrite/Remove operations log warnings and return Ok(()) instead of failing. Same-block duplicates remain allowed. Default behavior (skip_duplicates=false) is preserved exactly. Added Logger field to RowGroup/RowGroups with CacheWeight impl, threaded through all construction sites. 5 unit tests covering all scenarios. * store: propagate skip_duplicates to Table Added skip_duplicates: bool field to Table struct in relational.rs, wired from EntityType::skip_duplicates() in Table::new(), copied in Table::new_like(), and defaulted to false in make_poi_table(). * store: lenient store-layer enforcement for skip_duplicates Added logger parameter to Layout::update() and Layout::delete() in relational.rs. When table.immutable && table.skip_duplicates, these methods now log a warning and return Ok(0) instead of returning an error. Default immutable behavior (skip_duplicates=false) is preserved. Updated all callers including deployment_store.rs and test files. Added 4 unit tests with SkipDupMink entity type to verify both skip_duplicates and default immutable behavior. * store: ON CONFLICT DO NOTHING for skip_duplicates inserts Add conditional ON CONFLICT (primary_key) DO NOTHING clause to InsertQuery::walk_ast() when table.immutable && table.skip_duplicates. This handles cross-batch duplicates where an entity committed in a previous batch is re-inserted due to the immutable entity cache skipping store lookups. Two unit tests verify: skip_duplicates inserts include ON CONFLICT, default immutable inserts do not. * store: log cross-batch duplicate inserts in Layout::insert() Restructured Layout::insert() from if-let-Err to match to capture affected_rows from InsertQuery::execute(). Tracks expected vs actual row counts across both the main batch path and the row-by-row fallback path. Logs a warning when affected_rows < expected for skip_duplicates immutable tables. * tests: runner test for skipDuplicates immutable entities Add end-to-end runner test exercising @entity(immutable: true, skipDuplicates: true) with duplicate entity inserts across blocks. - tests/runner-tests/skip-duplicates/: subgraph with Ping entity using skipDuplicates, block handler that saves same ID every block - tests/tests/runner_tests.rs: skip_duplicates() test syncs 4 blocks and verifies entity is queryable (indexing did not fail) * graph, store: address review comments - Remove logs and the logger from the RowGroup - Simplify schema tests - Add ObjectMutability enum instead of two booleans
1 parent 4a00e8c commit f031138

22 files changed

Lines changed: 500 additions & 76 deletions

File tree

graph/examples/append_row.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ pub fn main() -> anyhow::Result<()> {
104104
};
105105
mods.push(md);
106106
}
107-
let mut group = RowGroup::new(THING_TYPE.clone(), false);
107+
let mut group = RowGroup::new(THING_TYPE.clone());
108108

109109
let start = Instant::now();
110110
for md in mods {

graph/src/components/store/write.rs

Lines changed: 99 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ use std::{
44
sync::Arc,
55
};
66

7+
use slog::Logger;
8+
79
use crate::{
810
blockchain::{block_stream::FirehoseCursor, BlockPtr, BlockTime},
911
cheap_clone::CheapClone,
@@ -338,24 +340,22 @@ impl LastMod {
338340
#[derive(Debug, CacheWeight)]
339341
pub struct RowGroup {
340342
pub entity_type: EntityType,
343+
341344
/// All changes for this entity type, ordered by block; i.e., if `i < j`
342345
/// then `rows[i].block() <= rows[j].block()`. Several methods on this
343346
/// struct rely on the fact that this ordering is observed.
344347
rows: Vec<EntityModification>,
345348

346-
immutable: bool,
347-
348349
/// Map the `key.entity_id` of all entries in `rows` to the index with
349350
/// the most recent entry for that id to speed up lookups.
350351
last_mod: LastMod,
351352
}
352353

353354
impl RowGroup {
354-
pub fn new(entity_type: EntityType, immutable: bool) -> Self {
355+
pub fn new(entity_type: EntityType) -> Self {
355356
Self {
356357
entity_type,
357358
rows: Vec::new(),
358-
immutable,
359359
last_mod: LastMod::new(),
360360
}
361361
}
@@ -471,7 +471,7 @@ impl RowGroup {
471471
/// Append `row` to `self.rows` by combining it with a previously
472472
/// existing row, if that is possible
473473
fn append_row(&mut self, row: EntityModification) -> Result<(), StoreError> {
474-
if self.immutable {
474+
if self.entity_type.is_immutable() {
475475
match row {
476476
EntityModification::Insert { .. } => {
477477
// Check if this is an attempt to overwrite an immutable
@@ -488,6 +488,9 @@ impl RowGroup {
488488
.and_then(|&idx| self.rows.get(idx))
489489
{
490490
Some(prev) if prev.block() != row.block() => {
491+
if self.entity_type.skip_duplicates() {
492+
return Ok(());
493+
}
491494
return Err(StoreError::Input(
492495
format!("entity {} is immutable; inserting it at block {} is not possible as it was already inserted at block {}",
493496
row.key(), row.block(), prev.block())));
@@ -498,6 +501,9 @@ impl RowGroup {
498501
self.push_row(row);
499502
}
500503
EntityModification::Overwrite { .. } | EntityModification::Remove { .. } => {
504+
if self.entity_type.skip_duplicates() {
505+
return Ok(());
506+
}
501507
return Err(internal_error!(
502508
"immutable entity type {} only allows inserts, not {:?}",
503509
self.entity_type,
@@ -604,8 +610,8 @@ impl RowGroup {
604610
pub struct RowGroupForPerfTest(RowGroup);
605611

606612
impl RowGroupForPerfTest {
607-
pub fn new(entity_type: EntityType, immutable: bool) -> Self {
608-
Self(RowGroup::new(entity_type, immutable))
613+
pub fn new(entity_type: EntityType) -> Self {
614+
Self(RowGroup::new(entity_type))
609615
}
610616

611617
pub fn push(&mut self, emod: EntityModification, block: BlockNumber) -> Result<(), StoreError> {
@@ -661,11 +667,15 @@ impl<'a> Iterator for ClampsByBlockIterator<'a> {
661667
#[derive(Debug, CacheWeight)]
662668
pub struct RowGroups {
663669
pub groups: Vec<RowGroup>,
670+
logger: Logger,
664671
}
665672

666673
impl RowGroups {
667-
fn new() -> Self {
668-
Self { groups: Vec::new() }
674+
fn new(logger: Logger) -> Self {
675+
Self {
676+
groups: Vec::new(),
677+
logger,
678+
}
669679
}
670680

671681
fn group(&self, entity_type: &EntityType) -> Option<&RowGroup> {
@@ -684,9 +694,7 @@ impl RowGroups {
684694
match pos {
685695
Some(pos) => &mut self.groups[pos],
686696
None => {
687-
let immutable = entity_type.is_immutable();
688-
self.groups
689-
.push(RowGroup::new(entity_type.clone(), immutable));
697+
self.groups.push(RowGroup::new(entity_type.clone()));
690698
// unwrap: we just pushed an entry
691699
self.groups.last_mut().unwrap()
692700
}
@@ -784,6 +792,7 @@ impl Batch {
784792
deterministic_errors: Vec<SubgraphError>,
785793
offchain_to_remove: Vec<StoredDynamicDataSource>,
786794
is_non_fatal_errors_active: bool,
795+
logger: Logger,
787796
) -> Result<Self, StoreError> {
788797
let block = block_ptr.number;
789798

@@ -797,7 +806,7 @@ impl Batch {
797806
EntityModification::Remove { .. } => 0,
798807
});
799808

800-
let mut mods = RowGroups::new();
809+
let mut mods = RowGroups::new(logger);
801810

802811
for m in raw_mods {
803812
mods.group_entry(&m.key().entity_type).push(m, block)?;
@@ -1079,7 +1088,6 @@ mod test {
10791088
let group = RowGroup {
10801089
entity_type: ENTRY_TYPE.clone(),
10811090
rows,
1082-
immutable: false,
10831091
last_mod,
10841092
};
10851093
let act = group
@@ -1120,6 +1128,8 @@ mod test {
11201128
type Thing @entity { id: ID!, count: Int! }
11211129
type RowGroup @entity { id: ID! }
11221130
type Entry @entity { id: ID! }
1131+
type ImmThing @entity(immutable: true) { id: ID!, count: Int! }
1132+
type SkipDupThing @entity(immutable: true, skipDuplicates: true) { id: ID!, count: Int! }
11231133
"#;
11241134
lazy_static! {
11251135
static ref DEPLOYMENT: DeploymentHash = DeploymentHash::new("batchAppend").unwrap();
@@ -1128,6 +1138,8 @@ mod test {
11281138
static ref THING_TYPE: EntityType = SCHEMA.entity_type("Thing").unwrap();
11291139
static ref ROW_GROUP_TYPE: EntityType = SCHEMA.entity_type("RowGroup").unwrap();
11301140
static ref ENTRY_TYPE: EntityType = SCHEMA.entity_type("Entry").unwrap();
1141+
static ref IMM_THING_TYPE: EntityType = SCHEMA.entity_type("ImmThing").unwrap();
1142+
static ref SKIP_DUP_THING_TYPE: EntityType = SCHEMA.entity_type("SkipDupThing").unwrap();
11311143
}
11321144

11331145
/// Convenient notation for changes to a fixed entity
@@ -1187,7 +1199,7 @@ mod test {
11871199
impl Group {
11881200
fn new() -> Self {
11891201
Self {
1190-
group: RowGroup::new(THING_TYPE.clone(), false),
1202+
group: RowGroup::new(THING_TYPE.clone()),
11911203
}
11921204
}
11931205

@@ -1292,4 +1304,76 @@ mod test {
12921304
let op = group.last_op(&key, 0);
12931305
assert_eq!(None, op);
12941306
}
1307+
1308+
fn make_insert(entity_type: &EntityType, id: &str, block: BlockNumber) -> EntityModification {
1309+
EntityModification::Insert {
1310+
key: entity_type.parse_key(id).unwrap(),
1311+
data: Arc::new(entity! { SCHEMA => id: id, count: block }),
1312+
block,
1313+
end: None,
1314+
}
1315+
}
1316+
1317+
fn make_overwrite(
1318+
entity_type: &EntityType,
1319+
id: &str,
1320+
block: BlockNumber,
1321+
) -> EntityModification {
1322+
EntityModification::Overwrite {
1323+
key: entity_type.parse_key(id).unwrap(),
1324+
data: Arc::new(entity! { SCHEMA => id: id, count: block }),
1325+
block,
1326+
end: None,
1327+
}
1328+
}
1329+
1330+
fn make_remove(entity_type: &EntityType, id: &str, block: BlockNumber) -> EntityModification {
1331+
EntityModification::Remove {
1332+
key: entity_type.parse_key(id).unwrap(),
1333+
block,
1334+
}
1335+
}
1336+
1337+
#[test]
1338+
fn append_row_immutable_default_rejects_cross_block_duplicate() {
1339+
let mut group = RowGroup::new(IMM_THING_TYPE.clone());
1340+
let res = group.push(make_insert(&IMM_THING_TYPE, "one", 1), 1);
1341+
assert!(res.is_ok());
1342+
let res = group.push(make_insert(&IMM_THING_TYPE, "one", 2), 2);
1343+
assert!(res.is_err());
1344+
}
1345+
1346+
#[test]
1347+
fn append_row_skip_duplicates_allows_cross_block_duplicate() {
1348+
let mut group = RowGroup::new(SKIP_DUP_THING_TYPE.clone());
1349+
let res = group.push(make_insert(&SKIP_DUP_THING_TYPE, "one", 1), 1);
1350+
assert!(res.is_ok());
1351+
let res = group.push(make_insert(&SKIP_DUP_THING_TYPE, "one", 2), 2);
1352+
assert!(res.is_ok());
1353+
assert_eq!(group.row_count(), 1);
1354+
}
1355+
1356+
#[test]
1357+
fn append_row_skip_duplicates_allows_overwrite() {
1358+
let mut group = RowGroup::new(SKIP_DUP_THING_TYPE.clone());
1359+
let res = group.append_row(make_overwrite(&SKIP_DUP_THING_TYPE, "one", 1));
1360+
assert!(res.is_ok());
1361+
}
1362+
1363+
#[test]
1364+
fn append_row_skip_duplicates_allows_remove() {
1365+
let mut group = RowGroup::new(SKIP_DUP_THING_TYPE.clone());
1366+
let res = group.append_row(make_remove(&SKIP_DUP_THING_TYPE, "one", 1));
1367+
assert!(res.is_ok());
1368+
}
1369+
1370+
#[test]
1371+
fn append_row_skip_duplicates_same_block_still_pushes() {
1372+
let mut group = RowGroup::new(SKIP_DUP_THING_TYPE.clone());
1373+
let res = group.push(make_insert(&SKIP_DUP_THING_TYPE, "one", 1), 1);
1374+
assert!(res.is_ok());
1375+
let res = group.push(make_insert(&SKIP_DUP_THING_TYPE, "one", 1), 1);
1376+
assert!(res.is_ok());
1377+
assert_eq!(group.row_count(), 2);
1378+
}
12951379
}

graph/src/schema/entity_type.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,10 @@ impl EntityType {
6868
self.schema.is_immutable(self.atom)
6969
}
7070

71+
pub fn skip_duplicates(&self) -> bool {
72+
self.schema.skip_duplicates(self.atom)
73+
}
74+
7175
pub fn id_type(&self) -> Result<IdType, Error> {
7276
self.schema.id_type(self.atom)
7377
}

0 commit comments

Comments
 (0)