diff --git a/node/hypergraph/inmem/hypergraph_crdt.go b/node/hypergraph/inmem/hypergraph_crdt.go new file mode 100644 index 0000000..d476cbd --- /dev/null +++ b/node/hypergraph/inmem/hypergraph_crdt.go @@ -0,0 +1,242 @@ +package inmem + +import "errors" + +var ErrInvalidLocation error = errors.New("invalid location") +var ErrMissingExtrinsics error = errors.New("missing extrinsics") +var ErrIsExtrinsic error = errors.New("is extrinsic") + +type HypergraphCRDT struct { + locations map[Location]struct{} + + vertexAdds map[Location]*IdSet + vertexRemoves map[Location]*IdSet + hyperedgeAdds map[Location]*IdSet + hyperedgeRemoves map[Location]*IdSet +} + +func NewHypergraphCRDT(locations []Location) *HypergraphCRDT { + hypergraph := &HypergraphCRDT{ + locations: make(map[Location]struct{}), + vertexAdds: make(map[Location]*IdSet), + vertexRemoves: make(map[Location]*IdSet), + hyperedgeAdds: make(map[Location]*IdSet), + hyperedgeRemoves: make(map[Location]*IdSet), + } + + for _, l := range locations { + hypergraph.locations[l] = struct{}{} + hypergraph.vertexAdds[l] = NewIdSet("vertex") + hypergraph.vertexRemoves[l] = NewIdSet("vertex") + hypergraph.hyperedgeAdds[l] = NewIdSet("hyperedge") + hypergraph.hyperedgeRemoves[l] = NewIdSet("hyperedge") + } + + return hypergraph +} + +func (hg *HypergraphCRDT) AddAtom(a Atom) error { + switch v := a.(type) { + case *Vertex: + hg.AddVertex(v) + return nil + case *Hyperedge: + return hg.AddHyperedge(v) + } + + return nil +} + +func (hg *HypergraphCRDT) AddVertex(v *Vertex) { + shardMap := ShardVertex(v) + for location, vertices := range shardMap { + for _, vertex := range vertices.VertexSet.atoms { + if vert, ok := vertex.(*Vertex); ok { + hg.vertexAdds[location].Add(vert) + } + } + } +} + +func (hg *HypergraphCRDT) AddVertexSet(vertices *IdSet) error { + if vertices.atomType != "vertex" { + return ErrInvalidAtomType + } + + shardMap := ShardAtomSet(vertices.atoms) + for location, vertices := range shardMap { + for _, vertex := range vertices.VertexSet.atoms { + if vert, ok := vertex.(*Vertex); ok { + hg.vertexAdds[location].Add(vert) + } + } + } + + return nil +} + +func (hg *HypergraphCRDT) AddHyperedge(h *Hyperedge) error { + if hg.LookupAtomSet(h.extrinsics) { + shardMap := ShardHyperedge(h) + + for location, set := range shardMap { + for _, hyperedge := range set.HyperedgeSet.atoms { + if he, ok := hyperedge.(*Hyperedge); ok { + hg.hyperedgeAdds[location].Add(he) + } + } + for _, vertex := range set.VertexSet.atoms { + if v, ok := vertex.(*Vertex); ok { + hg.hyperedgeAdds[location].Add(v) + } + } + } + + return nil + } else { + return ErrMissingExtrinsics + } +} + +func (hg *HypergraphCRDT) RemoveAtom(a Atom) error { + switch v := a.(type) { + case *Vertex: + return hg.RemoveVertex(v) + case *Hyperedge: + return hg.RemoveHyperedge(v) + } + + return nil +} + +func (hg *HypergraphCRDT) RemoveVertex(v *Vertex) error { + if hg.LookupVertex(v) { + for l, hyperedgeAdds := range hg.hyperedgeAdds { + for _, hyperedge := range hyperedgeAdds.atoms { + he, ok := hyperedge.(*Hyperedge) + if !ok { + continue + } + if !hg.hyperedgeRemoves[l].Has(he) { + if _, ok := he.extrinsics[v.GetID()]; ok { + return ErrIsExtrinsic + } + } + } + } + + hg.vertexRemoves[v.location].Add(v) + } + + return nil +} + +func (hg *HypergraphCRDT) RemoveHyperedge(h *Hyperedge) error { + if hg.LookupAtom(h) { + for l, hyperedgeAdds := range hg.hyperedgeAdds { + for _, hyperedge := range hyperedgeAdds.atoms { + he, ok := hyperedge.(*Hyperedge) + if !ok || hg.hyperedgeRemoves[l].Has(he) { + continue + } + if _, ok := he.extrinsics[h.GetID()]; ok { + return ErrIsExtrinsic + } + } + } + + hg.hyperedgeRemoves[h.location].Add(h) + } + + return nil +} + +func (hg *HypergraphCRDT) LookupAtom(a Atom) bool { + if _, ok := hg.locations[a.GetLocation()]; !ok { + return false + } + + switch v := a.(type) { + case *Vertex: + return hg.LookupVertex(v) + case *Hyperedge: + return hg.LookupHyperedge(v) + default: + return false + } +} + +// LookupAtomSet checks all atoms in an IdSet to see if they all can be looked +// up successfully. +func (hg *HypergraphCRDT) LookupAtomSet(atomSet map[string]Atom) bool { + for _, atom := range atomSet { + if !hg.LookupAtom(atom) { + return false + } + } + return true +} + +// LookupVertex checks if a vertex is added and not removed in the current +// location. +func (hg *HypergraphCRDT) LookupVertex(v *Vertex) bool { + location := v.GetLocation() + return hg.vertexAdds[location].Has(v) && !hg.vertexRemoves[location].Has(v) +} + +// LookupHyperedge checks if a hyperedge and its extrinsics can be looked up. +func (hg *HypergraphCRDT) LookupHyperedge(h *Hyperedge) bool { + return hg.LookupAtomSet(h.extrinsics) && + hg.hyperedgeAdds[h.GetLocation()].Has(h) && + !hg.hyperedgeRemoves[h.GetLocation()].Has(h) +} + +// Within checks if atom `a` is within hyperedge `h` directly or transitively. +func (hg *HypergraphCRDT) Within(a, h Atom) bool { + switch ha := h.(type) { + case *Hyperedge: + _, ok := ha.extrinsics[a.GetID()] + if ok || a.GetID() == h.GetID() { + return true + } + for _, extrinsic := range ha.extrinsics { + if he, ok := extrinsic.(*Hyperedge); ok { + for _, hyperExtrinsic := range he.extrinsics { + if hyperHe, ok := hyperExtrinsic.(*Hyperedge); ok { + if hg.LookupHyperedge(hyperHe) { + if _, ok := hyperHe.extrinsics[a.GetID()]; ok && + hg.Within(hyperHe, h) { + return true + } + } + } + } + } + } + } + return false +} + +// GetReconciledVertexSet computes the set of vertices that have been added but +// not removed for a location. +func (hg *HypergraphCRDT) GetReconciledVertexSet(l Location) *IdSet { + vertices := NewIdSet("vertex") + for _, v := range hg.vertexAdds[l].atoms { + if !hg.vertexRemoves[l].Has(v) { + vertices.Add(v) + } + } + return vertices +} + +// GetReconciledHyperedgeSet computes the set of hyperedges that have been added +// but not removed for a location. +func (hg *HypergraphCRDT) GetReconciledHyperedgeSet(l Location) *IdSet { + hyperedges := NewIdSet("hyperedge") + for _, h := range hg.hyperedgeAdds[l].atoms { + if !hg.hyperedgeRemoves[l].Has(h) { + hyperedges.Add(h) + } + } + return hyperedges +} diff --git a/node/hypergraph/inmem/hypergraph_test.go b/node/hypergraph/inmem/hypergraph_test.go new file mode 100644 index 0000000..16a2327 --- /dev/null +++ b/node/hypergraph/inmem/hypergraph_test.go @@ -0,0 +1,75 @@ +package inmem_test + +import ( + "testing" + + "github.com/stretchr/testify/assert" + hypergraph "source.quilibrium.com/quilibrium/monorepo/node/hypergraph/inmem" +) + +func TestIdSet(t *testing.T) { + v := hypergraph.NewVertex("1", "here") + h := hypergraph.NewHyperedge("2", "here", make(map[string]hypergraph.Atom)) + + vset := hypergraph.NewIdSet("vertex") + hset := hypergraph.NewIdSet("hyperedge") + assert.NoError(t, vset.Add(v)) + assert.NoError(t, hset.Add(h)) + + assert.True(t, vset.Has(v)) + assert.True(t, hset.Has(h)) + + vset.Delete(v) + assert.False(t, hset.Has(v)) +} + +func TestCRDT(t *testing.T) { + loc1 := hypergraph.Location("here1") + loc2 := hypergraph.Location("here2") + hg := hypergraph.NewHypergraphCRDT([]hypergraph.Location{loc1, loc2}) + + v1 := hypergraph.NewVertex("1", loc1) + v2 := hypergraph.NewVertex("2", loc2) + h1 := hypergraph.NewHyperedge("h1", loc1, make(map[string]hypergraph.Atom)) + + hg.AddVertex(v1) + hg.AddVertex(v2) + hg.AddHyperedge(h1) + h2vs := map[string]hypergraph.Atom{} + h2vs["1"] = v1 + h2vs["2"] = v2 + h2 := hypergraph.NewHyperedge("h2", loc2, h2vs) + hg.AddHyperedge(h2) + + h3vs := map[string]hypergraph.Atom{} + h3vs["h2"] = h2 + h3 := hypergraph.NewHyperedge("h3", loc1, h3vs) + hg.AddHyperedge(h3) + + assert.NotNil(t, hg.LookupVertex(v1)) + assert.NotNil(t, hg.LookupVertex(v2)) + assert.NotNil(t, hg.LookupHyperedge(h1)) + assert.NotNil(t, hg.LookupHyperedge(h2)) + assert.NotNil(t, hg.LookupHyperedge(h3)) + + assert.True(t, hg.GetReconciledVertexSet(v1.GetLocation()).Has(v1)) + assert.False(t, hg.GetReconciledVertexSet(v1.GetLocation()).Has(v2)) + assert.True(t, hg.GetReconciledVertexSet(v2.GetLocation()).Has(v2)) + assert.True(t, hg.GetReconciledHyperedgeSet(v1.GetLocation()).Has(h1)) + assert.False(t, hg.GetReconciledHyperedgeSet(h1.GetLocation()).Has(h2)) + assert.True(t, hg.GetReconciledHyperedgeSet(h2.GetLocation()).Has(h2)) + assert.True(t, hg.GetReconciledHyperedgeSet(h3.GetLocation()).Has(h3)) + + assert.Error(t, hg.RemoveHyperedge(h2)) + assert.True(t, hg.GetReconciledHyperedgeSet(h2.GetLocation()).Has(h2)) + assert.NoError(t, hg.RemoveHyperedge(h3)) + assert.False(t, hg.GetReconciledHyperedgeSet(h3.GetLocation()).Has(h3)) + assert.Error(t, hg.RemoveVertex(v1)) + assert.True(t, hg.GetReconciledVertexSet(v1.GetLocation()).Has(v1)) + assert.NoError(t, hg.RemoveHyperedge(h2)) + assert.False(t, hg.GetReconciledHyperedgeSet(h2.GetLocation()).Has(h2)) + assert.NoError(t, hg.RemoveVertex(v1)) + assert.False(t, hg.GetReconciledVertexSet(v1.GetLocation()).Has(v1)) + assert.NoError(t, hg.RemoveVertex(v2)) + assert.False(t, hg.GetReconciledVertexSet(v2.GetLocation()).Has(v2)) +} diff --git a/node/hypergraph/inmem/shard.go b/node/hypergraph/inmem/shard.go new file mode 100644 index 0000000..df88ff5 --- /dev/null +++ b/node/hypergraph/inmem/shard.go @@ -0,0 +1,89 @@ +package inmem + +func InShard(a Atom, l Location) bool { + return a.GetLocation() == l +} + +type ShardSet struct { + VertexSet *IdSet + HyperedgeSet *IdSet +} + +func ShardAtom(a Atom) map[Location]*ShardSet { + switch atom := a.(type) { + case *Vertex: + return ShardVertex(atom) + case *Hyperedge: + return ShardHyperedge(atom) + default: + return nil + } +} + +func ShardAtomSet(atomSet map[string]Atom) map[Location]*ShardSet { + result := make(map[Location]*ShardSet) + for _, a := range atomSet { + result[a.GetLocation()] = &ShardSet{ + VertexSet: NewIdSet("vertex"), + HyperedgeSet: NewIdSet("hyperedge"), + } + } + + for _, atom := range atomSet { + shard := ShardAtom(atom) + for location, locationShard := range shard { + for _, locationAtom := range locationShard.VertexSet.atoms { + if _, ok := result[location]; !ok { + result[location] = &ShardSet{ + VertexSet: NewIdSet("vertex"), + HyperedgeSet: NewIdSet("hyperedge"), + } + } + result[location].VertexSet.Add(locationAtom) + } + for _, locationAtom := range locationShard.HyperedgeSet.atoms { + if _, ok := result[location]; !ok { + result[location] = &ShardSet{ + VertexSet: NewIdSet("vertex"), + HyperedgeSet: NewIdSet("hyperedge"), + } + } + result[location].HyperedgeSet.Add(locationAtom) + } + } + } + return result +} + +func ShardVertex(v *Vertex) map[Location]*ShardSet { + result := make(map[Location]*ShardSet) + if _, ok := result[v.location]; !ok { + result[v.location] = &ShardSet{ + VertexSet: NewIdSet("vertex"), + HyperedgeSet: NewIdSet("hyperedge"), + } + } + result[v.location].VertexSet.Add(v) + return result +} + +// ShardHyperedge shards a hyperedge and its extrinsics across locations. +func ShardHyperedge(h *Hyperedge) map[Location]*ShardSet { + extrinsicShardSet := ShardAtomSet(h.extrinsics) + result := make(map[Location]*ShardSet) + + for l, s := range extrinsicShardSet { + result[l] = s + } + + if _, ok := result[h.location]; !ok { + result[h.location] = &ShardSet{ + VertexSet: NewIdSet("vertex"), + HyperedgeSet: NewIdSet("hyperedge"), + } + } + + result[h.location].HyperedgeSet.Add(h) + + return result +} diff --git a/node/hypergraph/inmem/types.go b/node/hypergraph/inmem/types.go new file mode 100644 index 0000000..cc5ac79 --- /dev/null +++ b/node/hypergraph/inmem/types.go @@ -0,0 +1,133 @@ +package inmem + +import "errors" + +type AtomType string +type Location string + +var ErrInvalidAtomType error = errors.New("invalid atom type for set") + +type Vertex struct { + id string + location Location +} + +type Hyperedge struct { + id string + location Location + extrinsics map[string]Atom +} + +type Atom interface { + GetID() string + GetAtomType() AtomType + GetLocation() Location +} + +var _v Atom = (*Vertex)(nil) +var _h Atom = (*Hyperedge)(nil) + +func NewVertex(id string, location Location) *Vertex { + return &Vertex{ + id, + location, + } +} + +func NewHyperedge( + id string, + location Location, + extrinsics map[string]Atom, +) *Hyperedge { + return &Hyperedge{ + id, + location, + extrinsics, + } +} + +func (v *Vertex) GetID() string { + return v.id +} + +func (h *Hyperedge) GetID() string { + return h.id +} + +func (v *Vertex) GetAtomType() AtomType { + return "vertex" +} + +func (h *Hyperedge) GetAtomType() AtomType { + return "hyperedge" +} + +func (v *Vertex) GetLocation() Location { + return v.location +} + +func (h *Hyperedge) GetLocation() Location { + return h.location +} + +type IdSet struct { + atomType AtomType + atoms map[string]Atom +} + +func NewIdSet(atomType AtomType) *IdSet { + return &IdSet{atomType: atomType, atoms: make(map[string]Atom)} +} + +// Add adds an atom to the IdSet if it's not already present. +func (set *IdSet) Add(atom Atom) error { + switch a := atom.(type) { + case *Vertex: + if set.atomType != "vertex" { + return ErrInvalidAtomType + } + if _, exists := set.atoms[a.GetID()]; !exists { + set.atoms[a.GetID()] = a + } + case *Hyperedge: + if set.atomType != "hyperedge" { + return ErrInvalidAtomType + } + if _, exists := set.atoms[a.GetID()]; !exists { + set.atoms[a.GetID()] = a + } + } + + return nil +} + +// Delete removes an atom from the IdSet and returns true if the atom was +// present. +func (set *IdSet) Delete(atom Atom) bool { + switch a := atom.(type) { + case *Vertex: + if _, exists := set.atoms[a.GetID()]; exists { + delete(set.atoms, a.GetID()) + return true + } + case *Hyperedge: + if _, exists := set.atoms[a.GetID()]; exists { + delete(set.atoms, a.GetID()) + return true + } + } + return false +} + +// Has checks if an atom is in the IdSet. +func (set *IdSet) Has(atom Atom) bool { + switch a := atom.(type) { + case *Vertex: + _, exists := set.atoms[a.GetID()] + return exists + case *Hyperedge: + _, exists := set.atoms[a.GetID()] + return exists + } + return false +}