From a4baa34ee0003cd680c2ae2642c30ccfc1aa57be Mon Sep 17 00:00:00 2001 From: rhuairahrighairigh Date: Mon, 27 Aug 2018 17:58:58 -0400 Subject: [PATCH] fill out rough implementation --- internal/x/paychan/README.md | 3 + internal/x/paychan/endblocker.go | 19 ++- internal/x/paychan/handler.go | 13 +- internal/x/paychan/keeper.go | 268 +++++++++++++++++++++++++------ internal/x/paychan/types.go | 36 ++++- 5 files changed, 274 insertions(+), 65 deletions(-) diff --git a/internal/x/paychan/README.md b/internal/x/paychan/README.md index c3812ab3..45222831 100644 --- a/internal/x/paychan/README.md +++ b/internal/x/paychan/README.md @@ -7,4 +7,7 @@ Simplifications: TODO + - error handling (getter setter return values? and what happens in failures) - chnge module name to "channel"? + - Find a better name for Queue - clarify distinction between int slice and abstract queue concept + - Do all the small functions need to be methods on the keeper or can they just be floating around? diff --git a/internal/x/paychan/endblocker.go b/internal/x/paychan/endblocker.go index 5a753a59..74947561 100644 --- a/internal/x/paychan/endblocker.go +++ b/internal/x/paychan/endblocker.go @@ -2,12 +2,21 @@ package paychan import () -func EndBlocker(ctx sdk.Context k Keeper) sdk.Tags { +func EndBlocker(ctx sdk.Context, k Keeper) sdk.Tags { - // Iterate through submittedUpdates and for each - // if current block height >= executionDate - // k.CloseChannel(...) + // Iterate through submittedUpdatesQueue + // TODO optimise so it doesn't pull every update from DB every block + var sUpdate SubmittedUpdate + q := k.getSubmittedUpdatesQueue(ctx) + for _, id := range q { + // close the channel if the update has reached its execution time. + // Using >= in case some are somehow missed. + sUpdate = k.getSubmittedUpdate(ctx, id) + if ctx.BlockHeight() >= sUpdate.ExecutionTime { + k.closeChannel(ctx, sUpdate.Update) + } + } tags := sdk.NewTags() return tags -} \ No newline at end of file +} diff --git a/internal/x/paychan/handler.go b/internal/x/paychan/handler.go index d639f99a..d442f7c1 100644 --- a/internal/x/paychan/handler.go +++ b/internal/x/paychan/handler.go @@ -38,10 +38,15 @@ func handleMsgCreate(ctx sdk.Context, k Keeper, msg MsgCreate) sdk.Result { // Leaves validation to the keeper methods. func handleMsgSubmitUpdate(ctx sdk.Context, k Keeper, msg MsgSubmitUpdate) sdk.Result { - // if only sender sig then - tags, err := k.InitChannelCloseBySender() - // else (if there are both) - tags, err := k.ChannelCloseByReceiver() + participants := k.getChannel(ctx, msg.Update.ChannelID).Participants + + // if only sender signed + if msg.submitter == participants[0] { + tags, err := k.InitCloseChannelBySender() + // else if receiver signed + } else if msg.submitter == participants[len(participants)-1] { + tags, err := k.CloseChannelByReceiver() + } if err != nil { return err.Result() diff --git a/internal/x/paychan/keeper.go b/internal/x/paychan/keeper.go index d3648884..dfcf99c6 100644 --- a/internal/x/paychan/keeper.go +++ b/internal/x/paychan/keeper.go @@ -31,38 +31,11 @@ func NewKeeper(cdc *wire.Codec, key sdk.StoreKey, ck bank.Keeper) Keeper { return keeper } -// bunch of business logic ... -/* -// Reteive a payment channel struct from the blockchain store. -// They are indexed by a concatenation of sender address, receiver address, and an integer. -func (k Keeper) GetPaychan(ctx sdk.Context, sender sdk.Address, receiver sdk.Address, id int64) (Paychan, bool) { - // Return error as second argument instead of bool? - var pych Paychan - // load from DB - store := ctx.KVStore(k.storeKey) - bz := store.Get(paychanKey(sender, receiver, id)) - if bz == nil { - return pych, false - } - // unmarshal - k.cdc.MustUnmarshalBinary(bz, &pych) - // return - return pych, true -} +// ============================================== Main Business Logic -// Store payment channel struct in blockchain store. -func (k Keeper) setPaychan(ctx sdk.Context, pych Paychan) { - store := ctx.KVStore(k.storeKey) - // marshal - bz := k.cdc.MustMarshalBinary(pych) // panics if something goes wrong - // write to db - pychKey := paychanKey(pych.Sender, pych.Receiver, pych.Id) - store.Set(pychKey, bz) // panics if something goes wrong -} -*/ // Create a new payment channel and lock up sender funds. -func (k Keeper) CreatePaychan(ctx sdk.Context, sender sdk.Address, receiver sdk.Address, coins sdk.Coins) (sdk.Tags, sdk.Error) { +func (k Keeper) CreateChannel(ctx sdk.Context, sender sdk.Address, receiver sdk.Address, coins sdk.Coins) (sdk.Tags, sdk.Error) { // TODO do validation and maybe move somewhere nicer /* // args present @@ -110,38 +83,229 @@ func (k Keeper) CreatePaychan(ctx sdk.Context, sender sdk.Address, receiver sdk. // save to db k.setChannel(ctx, channel) - // TODO create tags - //tags := sdk.NewTags() + // TODO add to tags + return tags, err } -/* This is how gov manages creating unique IDs. Needs to be deterministic - can't use UUID -func (keeper Keeper) getNewChannelID(ctx sdk.Context) (channelID int64, err sdk.Error) { - store := ctx.KVStore(keeper.storeKey) - bz := store.Get(KeyNextProposalID) - if bz == nil { - return -1, ErrInvalidGenesis(keeper.codespace, "InitialProposalID never set") + + +func (k Keeper) InitCloseChannelBySender(update Update) { + // This is roughly the default path for non unidirectional channels + + // TODO Validate update - e.g. check signed by sender + + q := k.getSubmittedUpdateQueue(ctx) + if q.Contains(update.ChannelID) { + // Someone has previously tried to update channel + existingSUpdate := k.getSubmittedUpdate(ctx, update.ChannelID) + k.addToSubmittedUpdateQueue(ctx, k.applyNewUpdate(existingSUpdate, update)) + } else { + // No one has tried to update channel. + submittedUpdate := SubmittedUpdate{ + Update: update + executionTime: ctx.BlockHeight()+ChannelDisputeTime //TODO check what exactly BlockHeight refers to + } + k.addToSubmittedUpdateQueue(ctx, submittedUpdate) } - keeper.cdc.MustUnmarshalBinary(bz, &proposalID) - bz = keeper.cdc.MustMarshalBinary(proposalID + 1) - store.Set(KeyNextProposalID, bz) - return proposalID, nil -*/ - -func (k Keeper) ChannelCloseByReceiver() () { - // Validate inputs - // k.closeChannel } -func (k Keeper) InitChannelCloseBySender() () { - // Validate inputs - // Create SubmittedUpdate from Update and add to queue +func (k Keeper) CloseChannelByReceiver(update Update) () { + // TODO Validate update + + // Check if there is an update in the queue already + q := k.getSubmittedUpdateQueue(ctx) + if q.Contains(update.ChannelID) { + // Someone has previously tried to update channel but receiver has final say + k.removeFromSubmittedUpdateQueue(ctx, update.ChannelID) + } + + k.closeChannel(ctx, update) } -func (k Keeper) closeChannel() () { - // Remove corresponding SubmittedUpdate from queue (if it exist) +// Main function that compare updates against each other. +// Pure function +func (k Keeper) applyNewUpdate(existingSUpdate, proposedUpdate) SubmittedUpdate { + var returnUpdate SubmittedUpdate + + if existingSUpdate.sequence > proposedUpdate.sequence { + // update accepted + returnUpdate = SubmittedUpdate{ + Update: proposedUpdate + ExecutionTime: existingSUpdate.ExecutionTime + } + } else { + // update rejected + returnUpdate = existingSUpdate + } + return returnUpdate +} + +func (k Keeper) closeChannel(ctx sdk.Context, update Update) { + channel := k.getChannel(ctx, update.ChannelID) + // Add coins to sender and receiver - // Delete Channel + for address, coins := range update.CoinsUpdate { + // TODO check somewhere if coins are not negative? + k.ck.AddCoins(ctx, address, coins) + } + + k.deleteChannel(ctx, update.ChannelID) +} + + + +// =========================================== QUEUE + + +func (k Keeper) addToSubmittedUpdatesQueue(ctx sdk.Context, sUpdate SubmittedUpdate) { + // always overwrite prexisting values - leave paychan logic to higher levels + // get current queue + q := k.getSubmittedUpdateQueue(ctx) + // append ID to queue + if q.Contains(sUpdate.ChannelID)! { + q = append(q, sUpdate.ChannelID) + } + // set queue + k.setSubmittedUpdateQueue(ctx, q) + // store submittedUpdate + k.setSubmittedUpdate(ctx, sUpdate) +} +func (k Keeper) removeFromSubmittdUpdatesQueue(ctx sdk.Context, channelID) { + // get current queue + q := k.getSubmittedUpdateQueue(ctx) + // remove id + q.RemoveMatchingElements(channelID) + // set queue + k.setSubmittedUpdateQueue(ctx, q) + // delete submittedUpdate + k.deleteSubmittedUpdate(ctx, channelID) +} + +func (k Keeper) getSubmittedUpdatesQueue(ctx sdk.Context) (Queue, bool) { + // load from DB + store := ctx.KVStore(k.storeKey) + bz := store.Get(k.getSubmittedUpdatesQueueKey()) + + var q Queue + if bz == nil { + return q, false + } + // unmarshal + k.cdc.MustUnmarshalBinary(bz, &q) + // return + return q, true +} +func (k Keeper) setSubmittedUpdatesQueue(ctx sdk.Context, q Queue) { + store := ctx.KVStore(k.storeKey) + // marshal + bz := k.cdc.MustMarshalBinary(q) + // write to db + key := k.getSubmittedUpdatesQueueKey() + store.Set(key, bz) +} +func (k Keeper) getSubmittedUpdatesQueueKey() []byte { + return []byte("submittedUpdatesQueue") +} + +// ============= SUBMITTED UPDATES +// These are keyed by the IDs of thei associated Channels +// This section deals with only setting and getting + +func (k Keeper) getSubmittedUpdate(ctx sdk.Context, channelID ChannelID) (SubmittedUpdate, bool) { + + // load from DB + store := ctx.KVStore(k.storeKey) + bz := store.Get(k.getSubmittedUpdateKey(channelID)) + + var sUpdate SubmittedUpdate + if bz == nil { + return sUpdate, false + } + // unmarshal + k.cdc.MustUnmarshalBinary(bz, &sUpdate) + // return + return sUpdate, true +} + +// Store payment channel struct in blockchain store. +func (k Keeper) setSubmittedUpdate(ctx sdk.Context, sUpdate SubmittedUpdate) { + store := ctx.KVStore(k.storeKey) + // marshal + bz := k.cdc.MustMarshalBinary(sUpdate) // panics if something goes wrong + // write to db + key := k.getSubmittedUpdateKey(sUpdate.channelID) + store.Set(key, bz) // panics if something goes wrong +} + +func (k Keeper) deleteSubmittedUpdate(ctx sdk.Context, channelID ) { + store := ctx.KVStore(k.storeKey) + store.Delete(k.getSubmittedUpdateKey(channelID)) + // TODO does this have return values? What happens when key doesn't exist? +} +func (k Keeper) getSubmittedUpdateKey(channelID ChannelID) []byte { + return []byte(fmt.Sprintf("submittedUpdate:%d", channelID)) +} + + +// ========================================== CHANNELS + +// Reteive a payment channel struct from the blockchain store. +func (k Keeper) getChannel(ctx sdk.Context, channelID ChannelID) (Channel, bool) { + + // load from DB + store := ctx.KVStore(k.storeKey) + bz := store.Get(k.getChannelKey(channelID)) + + var channel Channel + if bz == nil { + return channel, false + } + // unmarshal + k.cdc.MustUnmarshalBinary(bz, &channel) + // return + return channel, true +} + +// Store payment channel struct in blockchain store. +func (k Keeper) setChannel(ctx sdk.Context, channel Channel) { + store := ctx.KVStore(k.storeKey) + // marshal + bz := k.cdc.MustMarshalBinary(channel) // panics if something goes wrong + // write to db + key := sdk.getChannelKey(channel.ID) + store.Set(key, bz) // panics if something goes wrong +} + +func (k Keeper) deleteChannel(ctx sdk.Context, channelID ) { + store := ctx.KVStore(k.storeKey) + store.Delete(k.getChannelKey(channelID)) + // TODO does this have return values? What happens when key doesn't exist? +} + +func (k Keeper) getNewChannelID(ctx sdk.Context) (int64, error) { + // get last channel ID + store := k.KVStore(k.storeKey) + bz := store.Get(k.getLastChannelIDKey()) + if bz == nil { + return nil, // TODO throw some error (assumes this has been initialized elsewhere) or just set to zero here + } + var lastID ChannelID + k.cdc.MustUnmarshalBinary(bz, &lastID) + // increment to create new one + newID := lastID+1 + bz = k.cdc.MustMarshalBinary(newID) + // set last channel id again + store.Set(k.getLastChannelIDKey(), bz) + // return + return newID +} + +func (k Keeper) getChannelKey(channelID ChannelID) []byte { + return []bytes(fmt.Sprintf("channel:%d", channelID)) +} +func (k Keeper) getLastChannelIDKey() []byte { + return []bytes("lastChannelID") } /* // Close a payment channel and distribute funds to participants. diff --git a/internal/x/paychan/types.go b/internal/x/paychan/types.go index bbe6738b..2b532c74 100644 --- a/internal/x/paychan/types.go +++ b/internal/x/paychan/types.go @@ -11,15 +11,17 @@ import ( // Participants is limited to two as currently these are unidirectional channels. // Last participant is designated as receiver. type Channel struct { - ID int64 + ID ChannelID Participants [2]sdk.AccAddress Coins sdk.Coins } +type ChannelID int64 // TODO should this be positive only + // The data that is passed between participants as payments, and submitted to the blockchain to close a channel. type Update struct { ChannelID int64 - CoinsUpdate //TODO type + CoinsUpdate map[sdk.AccAddress]sdk.Coins Sequence int64 sig // TODO type, only sender needs to sign } @@ -27,9 +29,35 @@ type Update struct { // An update that has been submitted to the blockchain, but not yet acted on. type SubmittedUpdate { Update - executionDate int64 // BlockHeight + executionTime int64 // BlockHeight } +type SubmittedUpdateQueue []ChannelID +// Check if value is in queue +func (suq SubmittedChannelID) Contains(channelID ChannelID) bool { + found := false + for _, id := range(suq) { + if id == channelID { + found = true + break + } + } + return found +} +// Remove all values from queue that match argument +func (suq SubmittedUpdateQueue) RemoveMatchingElements(channelID ChannelID) { + newSUQ := SubmittedUpdateQueue{} + + for _, id := range(suq) { + if id != channelID { + newSUQ = append(newSUQ, id) + } + } + suq = newSUQ +} + +var ChannelDisputeTime = 2000 // measured in blocks + /* MESSAGE TYPES */ /* Message implement the sdk.Msg interface: @@ -120,7 +148,7 @@ func (msg MsgCreate) GetSigners() []sdk.Address { // A message to close a payment channel. type MsgSubmitUpdate struct { Update - // might need a "signer" to be able to say who is signing this as either can or not + submitter sdk.AccAddress } // func (msg MsgSubmitUpdate) NewMsgSubmitUpdate(update Update) MsgSubmitUpdate {