Skip to content

Commit

Permalink
Merge pull request automerge#347 from jeffa5/observer-counters
Browse files Browse the repository at this point in the history
Add increment observation for observer
  • Loading branch information
orionz authored Apr 21, 2022
2 parents 1fc5e55 + 1bee30c commit e36f3c2
Show file tree
Hide file tree
Showing 10 changed files with 213 additions and 9 deletions.
7 changes: 7 additions & 0 deletions automerge-wasm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,13 @@ impl Automerge {
};
}

Patch::Increment { obj, key, value } => {
js_set(&patch, "action", "increment")?;
js_set(&patch, "obj", obj.to_string())?;
js_set(&patch, "key", key)?;
js_set(&patch, "value", value.0)?;
}

Patch::Delete { obj, key } => {
js_set(&patch, "action", "delete")?;
js_set(&patch, "obj", obj.to_string())?;
Expand Down
30 changes: 24 additions & 6 deletions automerge-wasm/test/test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -956,15 +956,15 @@ describe('Automerge', () => {
doc1.free()
})

it.skip('should capture local increment ops', () => {
it('should capture local increment ops', () => {
let doc1 = create('aaaa')
doc1.enablePatches(true)
doc1.put('_root', 'counter', 2, 'counter')
doc1.increment('_root', 'counter', 4)

assert.deepEqual(doc1.popPatches(), [
{action: 'put', obj: '_root', key: 'counter', value: 2, datatype: 'counter', conflict: false},
{action: 'put', obj: '_root', key: 'counter', value: 6, datatype: 'counter', conflict: false},
{action: 'increment', obj: '_root', key: 'counter', value: 4},
])
doc1.free()
})
Expand All @@ -986,23 +986,41 @@ describe('Automerge', () => {
doc1.free()
})

it.skip('should support counters in a map', () => {
it('should support counters in a map', () => {
let doc1 = create('aaaa'), doc2 = create('bbbb')
doc2.enablePatches(true)
doc1.put('_root', 'starlings', 2, 'counter')
doc2.loadIncremental(doc1.saveIncremental())
doc1.increment('_root', 'starlings', 1)
doc1.dump()
doc2.loadIncremental(doc1.saveIncremental())
assert.deepEqual(doc2.get('_root', 'starlings'), ['counter', 3])
assert.deepEqual(doc2.popPatches(), [
{action: 'put', obj: '_root', key: 'starlings', value: 2, datatype: 'counter', conflict: false},
{action: 'put', obj: '_root', key: 'starlings', value: 3, datatype: 'counter', conflict: false}
{action: 'increment', obj: '_root', key: 'starlings', value: 1}
])
doc1.free(); doc2.free()
})

it('should support counters in a list') // TODO
it('should support counters in a list', () => {
let doc1 = create('aaaa'), doc2 = create('bbbb')
doc2.enablePatches(true)
const list = doc1.putObject('_root', 'list', [])
doc2.loadIncremental(doc1.saveIncremental())
doc1.insert(list, 0, 1, 'counter')
doc2.loadIncremental(doc1.saveIncremental())
doc1.increment(list, 0, 2)
doc2.loadIncremental(doc1.saveIncremental())
doc1.increment(list, 0, -5)
doc2.loadIncremental(doc1.saveIncremental())

assert.deepEqual(doc2.popPatches(), [
{action: 'put', obj: '_root', key: 'list', value: list, datatype: 'list', conflict: false},
{action: 'insert', obj: list, key: 0, value: 1, datatype: 'counter'},
{action: 'increment', obj: list, key: 0, value: 2},
{action: 'increment', obj: list, key: 0, value: -5},
])
doc1.free(); doc2.free()
})

it('should delete a counter from a map') // TODO
})
Expand Down
9 changes: 9 additions & 0 deletions automerge/examples/watch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,15 @@ fn get_changes(doc: &Automerge, patches: Vec<Patch>) {
doc.path_to_object(&obj)
)
}
Patch::Increment { obj, key, value } => {
println!(
"increment {:?} in obj {:?} by {:?}, object path {:?}",
key,
obj,
value,
doc.path_to_object(&obj)
)
}
Patch::Delete { obj, key } => println!(
"delete {:?} in obj {:?}, object path {:?}",
key,
Expand Down
96 changes: 94 additions & 2 deletions automerge/src/automerge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1004,7 +1004,7 @@ impl Automerge {

pub fn dump(&self) {
log!(
" {:12} {:12} {:12} {} {} {}",
" {:12} {:12} {:12} {:12} {:12} {:12}",
"id",
"obj",
"key",
Expand All @@ -1028,7 +1028,7 @@ impl Automerge {
let pred: Vec<_> = op.pred.iter().map(|id| self.to_string(*id)).collect();
let succ: Vec<_> = op.succ.iter().map(|id| self.to_string(*id)).collect();
log!(
" {:12} {:12} {:12} {} {:?} {:?}",
" {:12} {:12} {:12} {:12} {:12?} {:12?}",
id,
obj,
key,
Expand Down Expand Up @@ -2005,4 +2005,96 @@ mod tests {
let len = doc.length(&text);
assert_eq!(len, 4); // 4 chars
}

#[test]
fn observe_counter_change_application_overwrite() {
let mut doc1 = AutoCommit::new();
doc1.set_actor(ActorId::from([1]));
doc1.put(ROOT, "counter", ScalarValue::counter(1)).unwrap();
doc1.commit();

let mut doc2 = doc1.fork();
doc2.set_actor(ActorId::from([2]));
doc2.put(ROOT, "counter", "mystring").unwrap();
doc2.commit();

doc1.increment(ROOT, "counter", 2).unwrap();
doc1.commit();
doc1.increment(ROOT, "counter", 5).unwrap();
doc1.commit();

let mut observer = VecOpObserver::default();
let mut doc3 = doc1.clone();
doc3.merge_with(
&mut doc2,
ApplyOptions::default().with_op_observer(&mut observer),
)
.unwrap();

assert_eq!(
observer.take_patches(),
vec![Patch::Put {
obj: ExId::Root,
key: Prop::Map("counter".into()),
value: (
ScalarValue::Str("mystring".into()).into(),
ExId::Id(2, doc2.get_actor().clone(), 1)
),
conflict: false
}]
);

let mut observer = VecOpObserver::default();
let mut doc4 = doc2.clone();
doc4.merge_with(
&mut doc1,
ApplyOptions::default().with_op_observer(&mut observer),
)
.unwrap();

// no patches as the increments operate on an invisible counter
assert_eq!(observer.take_patches(), vec![]);
}

#[test]
fn observe_counter_change_application() {
let mut doc = AutoCommit::new();
doc.put(ROOT, "counter", ScalarValue::counter(1)).unwrap();
doc.increment(ROOT, "counter", 2).unwrap();
doc.increment(ROOT, "counter", 5).unwrap();
let changes = doc.get_changes(&[]).into_iter().cloned().collect();

let mut new_doc = AutoCommit::new();
let mut observer = VecOpObserver::default();
new_doc
.apply_changes_with(
changes,
ApplyOptions::default().with_op_observer(&mut observer),
)
.unwrap();
assert_eq!(
observer.take_patches(),
vec![
Patch::Put {
obj: ExId::Root,
key: Prop::Map("counter".into()),
value: (
ScalarValue::counter(1).into(),
ExId::Id(1, doc.get_actor().clone(), 0)
),
conflict: false
},
Patch::Increment {
obj: ExId::Root,
key: Prop::Map("counter".into()),
value: (2, ExId::Id(2, doc.get_actor().clone(), 0)),
},
Patch::Increment {
obj: ExId::Root,
key: Prop::Map("counter".into()),
value: (5, ExId::Id(3, doc.get_actor().clone(), 0)),
}
]
);
}
}
28 changes: 28 additions & 0 deletions automerge/src/op_observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@ pub trait OpObserver {
/// - `conflict`: whether this put conflicts with other operations.
fn put(&mut self, objid: ExId, key: Prop, tagged_value: (Value, ExId), conflict: bool);

/// A counter has been incremented.
///
/// - `objid`: the object that contains the counter.
/// - `key`: they key that the chounter is at.
/// - `tagged_value`: the amount the counter has been incremented by, and the the id of the
/// increment operation.
fn increment(&mut self, objid: ExId, key: Prop, tagged_value: (i64, ExId));

/// A value has beeen deleted.
///
/// - `objid`: the object that has been deleted in.
Expand All @@ -33,6 +41,8 @@ impl OpObserver for () {

fn put(&mut self, _objid: ExId, _key: Prop, _tagged_value: (Value, ExId), _conflict: bool) {}

fn increment(&mut self, _objid: ExId, _key: Prop, _tagged_value: (i64, ExId)) {}

fn delete(&mut self, _objid: ExId, _key: Prop) {}
}

Expand Down Expand Up @@ -68,6 +78,14 @@ impl OpObserver for VecOpObserver {
});
}

fn increment(&mut self, objid: ExId, key: Prop, tagged_value: (i64, ExId)) {
self.patches.push(Patch::Increment {
obj: objid,
key,
value: tagged_value,
});
}

fn delete(&mut self, objid: ExId, key: Prop) {
self.patches.push(Patch::Delete { obj: objid, key })
}
Expand Down Expand Up @@ -96,6 +114,16 @@ pub enum Patch {
/// The value that was inserted, and the id of the operation that inserted it there.
value: (Value<'static>, ExId),
},
/// Incrementing a counter.
Increment {
/// The object that was incremented in.
obj: ExId,
/// The key that was incremented.
key: Prop,
/// The amount that the counter was incremented by, and the id of the operation that
/// did the increment.
value: (i64, ExId),
},
/// Deleting an element from a list/text
Delete {
/// The object that was deleted from.
Expand Down
7 changes: 7 additions & 0 deletions automerge/src/op_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,13 @@ impl OpSetInternal {
} else {
observer.delete(ex_obj, key);
}
} else if let Some(value) = op.get_increment_value() {
// only observe this increment if the counter is visible, i.e. the counter's
// create op is in the values
if values.iter().any(|value| op.pred.contains(&value.id)) {
// we have observed the value
observer.increment(ex_obj, key, (value, self.id_to_exid(op.id)));
}
} else {
let winner = if let Some(last_value) = values.last() {
if self.m.lamport_cmp(op.id, last_value.id) == Ordering::Greater {
Expand Down
16 changes: 16 additions & 0 deletions automerge/src/query/seek_op_with_patch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ impl<'a> TreeQuery<'a> for SeekOpWithPatch<'a> {

// Keep track of any ops we're overwriting and any conflicts on this key
if self.op.overwrites(op) {
// when we encounter an increment op we also want to find the counter for
// it.
if self.op.is_inc() && op.is_counter() && op.visible() {
self.values.push(op);
}
self.succ.push(self.pos);
} else if op.visible() {
self.values.push(op);
Expand All @@ -145,6 +150,7 @@ impl<'a> TreeQuery<'a> for SeekOpWithPatch<'a> {
if m.lamport_cmp(op.id, self.op.id) == Ordering::Greater {
break;
}

self.pos += 1;
}

Expand Down Expand Up @@ -178,6 +184,11 @@ impl<'a> TreeQuery<'a> for SeekOpWithPatch<'a> {
if self.is_target_insert(e) {
self.found = true;
if self.op.overwrites(e) {
// when we encounter an increment op we also want to find the counter for
// it.
if self.op.is_inc() && e.is_counter() && e.visible() {
self.values.push(e);
}
self.succ.push(self.pos);
}
if e.visible() {
Expand All @@ -190,6 +201,11 @@ impl<'a> TreeQuery<'a> for SeekOpWithPatch<'a> {
// Once we've found the reference element, keep track of any ops that we're overwriting
let overwritten = self.op.overwrites(e);
if overwritten {
// when we encounter an increment op we also want to find the counter for
// it.
if self.op.is_inc() && e.is_counter() && e.visible() {
self.values.push(e);
}
self.succ.push(self.pos);
}

Expand Down
2 changes: 2 additions & 0 deletions automerge/src/transaction/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ impl TransactionInner {
}
} else if op.is_delete() {
observer.delete(ex_obj, prop.clone());
} else if let Some(value) = op.get_increment_value() {
observer.increment(ex_obj, prop.clone(), (value, doc.id_to_exid(op.id)));
} else {
let value = (op.value(), doc.ops.id_to_exid(op.id));
observer.put(ex_obj, prop.clone(), value, false);
Expand Down
8 changes: 8 additions & 0 deletions automerge/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,14 @@ impl Op {
}
}

pub fn get_increment_value(&self) -> Option<i64> {
if let OpType::Increment(i) = self.action {
Some(i)
} else {
None
}
}

pub fn value(&self) -> Value {
match &self.action {
OpType::Make(obj_type) => Value::Object(*obj_type),
Expand Down
19 changes: 18 additions & 1 deletion automerge/tests/test.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use automerge::transaction::Transactable;
use automerge::{
ActorId, AutoCommit, Automerge, AutomergeError, ObjType, ScalarValue, Value, ROOT,
ActorId, ApplyOptions, AutoCommit, Automerge, AutomergeError, ObjType, ScalarValue, Value, VecOpObserver, ROOT,
};

mod helpers;
Expand Down Expand Up @@ -930,6 +930,23 @@ fn list_counter_del() -> Result<(), automerge::AutomergeError> {
Ok(())
}

#[test]
fn observe_counter_change_application() {
let mut doc = AutoCommit::new();
doc.put(ROOT, "counter", ScalarValue::counter(1)).unwrap();
doc.increment(ROOT, "counter", 2).unwrap();
doc.increment(ROOT, "counter", 5).unwrap();
let changes = doc.get_changes(&[]).into_iter().cloned().collect();

let mut doc = AutoCommit::new();
let mut observer = VecOpObserver::default();
doc.apply_changes_with(
changes,
ApplyOptions::default().with_op_observer(&mut observer),
)
.unwrap();
}

#[test]
fn increment_non_counter_map() {
let mut doc = AutoCommit::new();
Expand Down

0 comments on commit e36f3c2

Please sign in to comment.