Skip to content

Coping with sqlchain soft forks #201

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jan 14, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 44 additions & 16 deletions sqlchain/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func NewChainWithContext(ctx context.Context, c *Config) (chain *Chain, err erro
return
}

log.Debugf("create new chain bdb %s", bdbFile)
log.WithField("db", c.DatabaseID).Debugf("create new chain bdb %s", bdbFile)

// Open LevelDB for ack/request/response
tdbFile := c.ChainFilePrefix + "-ack-req-resp.ldb"
Expand All @@ -172,7 +172,7 @@ func NewChainWithContext(ctx context.Context, c *Config) (chain *Chain, err erro
return
}

log.Debugf("create new chain tdb %s", tdbFile)
log.WithField("db", c.DatabaseID).Debugf("create new chain tdb %s", tdbFile)

// Open x.State
var (
Expand All @@ -197,7 +197,7 @@ func NewChainWithContext(ctx context.Context, c *Config) (chain *Chain, err erro
}
addr, err = crypto.PubKeyHash(pk.PubKey())
if err != nil {
log.WithError(err).Warning("failed to generate addr in NewChain")
log.WithError(err).WithField("db", c.DatabaseID).Warning("failed to generate addr in NewChain")
return
}

Expand Down Expand Up @@ -283,7 +283,7 @@ func LoadChainWithContext(ctx context.Context, c *Config) (chain *Chain, err err
}
addr, err = crypto.PubKeyHash(pk.PubKey())
if err != nil {
log.WithError(err).Warning("failed to generate addr in LoadChain")
log.WithError(err).WithField("db", c.DatabaseID).Warning("failed to generate addr in LoadChain")
return
}

Expand Down Expand Up @@ -328,6 +328,7 @@ func LoadChainWithContext(ctx context.Context, c *Config) (chain *Chain, err err
log.WithFields(log.Fields{
"peer": chain.rt.getPeerInfoString(),
"state": st,
"db": c.DatabaseID,
}).Debug("loading state from database")

// Read blocks and rebuild memory index
Expand Down Expand Up @@ -355,6 +356,7 @@ func LoadChainWithContext(ctx context.Context, c *Config) (chain *Chain, err err
log.WithFields(log.Fields{
"peer": chain.rt.getPeerInfoString(),
"block": block.BlockHash().String(),
"db": c.DatabaseID,
}).Debug("loading block from database")

if last == nil {
Expand Down Expand Up @@ -413,6 +415,7 @@ func LoadChainWithContext(ctx context.Context, c *Config) (chain *Chain, err err
log.WithFields(log.Fields{
"height": h,
"header": resp.Hash().String(),
"db": c.DatabaseID,
}).Debug("loaded new resp header")
}
if err = respIter.Error(); err != nil {
Expand All @@ -434,6 +437,7 @@ func LoadChainWithContext(ctx context.Context, c *Config) (chain *Chain, err err
log.WithFields(log.Fields{
"height": h,
"header": ack.Hash().String(),
"db": c.DatabaseID,
}).Debug("loaded new ack header")
}
if err = respIter.Error(); err != nil {
Expand Down Expand Up @@ -493,6 +497,7 @@ func (c *Chain) pushBlock(b *types.Block) (err error) {
"index": i,
"producer": b.Producer(),
"block_hash": b.BlockHash(),
"db": c.databaseID,
}).WithError(ierr).Warn("failed to add response to ackIndex")
}
}
Expand All @@ -502,6 +507,7 @@ func (c *Chain) pushBlock(b *types.Block) (err error) {
"index": i,
"producer": b.Producer(),
"block_hash": b.BlockHash(),
"db": c.databaseID,
}).WithError(ierr).Warn("failed to remove Ack from ackIndex")
}
}
Expand All @@ -524,6 +530,7 @@ func (c *Chain) pushBlock(b *types.Block) (err error) {
return "|"
}(), st.Head.String()[:8]),
"headHeight": c.rt.getHead().Height,
"db": c.databaseID,
}).Info("pushed new block")
}

Expand All @@ -532,7 +539,7 @@ func (c *Chain) pushBlock(b *types.Block) (err error) {

// pushAckedQuery pushes a acknowledged, signed and verified query into the chain.
func (c *Chain) pushAckedQuery(ack *types.SignedAckHeader) (err error) {
log.Debugf("push ack %s", ack.Hash().String())
log.WithField("db", c.databaseID).Debugf("push ack %s", ack.Hash().String())
h := c.rt.getHeightFromTime(ack.SignedResponseHeader().Timestamp)
k := heightToKey(h)
var enc *bytes.Buffer
Expand Down Expand Up @@ -613,6 +620,7 @@ func (c *Chain) produceBlock(now time.Time) (err error) {
"curr_turn": c.rt.getNextTurn(),
"using_timestamp": now.Format(time.RFC3339Nano),
"block_hash": block.BlockHash().String(),
"db": c.databaseID,
}).Debug("produced new block")
// Advise new block to the other peers
var (
Expand Down Expand Up @@ -652,6 +660,7 @@ func (c *Chain) produceBlock(now time.Time) (err error) {
"curr_turn": c.rt.getNextTurn(),
"using_timestamp": now.Format(time.RFC3339Nano),
"block_hash": block.BlockHash().String(),
"db": c.databaseID,
}).WithError(err).Error("failed to advise new block")
}
}(s)
Expand Down Expand Up @@ -693,6 +702,7 @@ func (c *Chain) syncHead() {
"curr_turn": c.rt.getNextTurn(),
"head_height": c.rt.getHead().Height,
"head_block": c.rt.getHead().Head.String(),
"db": c.databaseID,
}).WithError(err).Debug(
"Failed to fetch block from peer")
} else {
Expand All @@ -710,6 +720,7 @@ func (c *Chain) syncHead() {
"curr_turn": c.rt.getNextTurn(),
"head_height": c.rt.getHead().Height,
"head_block": c.rt.getHead().Head.String(),
"db": c.databaseID,
}).Debug(
"Fetch block from remote peer successfully")
succ = true
Expand All @@ -725,6 +736,7 @@ func (c *Chain) syncHead() {
"curr_turn": c.rt.getNextTurn(),
"head_height": c.rt.getHead().Height,
"head_block": c.rt.getHead().Head.String(),
"db": c.databaseID,
}).Debug(
"Cannot get block from any peer")
}
Expand All @@ -750,6 +762,7 @@ func (c *Chain) runCurrentTurn(now time.Time) {
"head_height": c.rt.getHead().Height,
"head_block": c.rt.getHead().Head.String(),
"using_timestamp": now.Format(time.RFC3339Nano),
"db": c.databaseID,
}).Debug("run current turn")

if c.rt.getHead().Height < c.rt.getNextTurn()-1 {
Expand All @@ -760,6 +773,7 @@ func (c *Chain) runCurrentTurn(now time.Time) {
"head_height": c.rt.getHead().Height,
"head_block": c.rt.getHead().Head.String(),
"using_timestamp": now.Format(time.RFC3339Nano),
"db": c.databaseID,
}).Error("A block will be skipped")
}

Expand All @@ -773,6 +787,7 @@ func (c *Chain) runCurrentTurn(now time.Time) {
"time": c.rt.getChainTimeString(),
"curr_turn": c.rt.getNextTurn(),
"using_timestamp": now.Format(time.RFC3339Nano),
"db": c.databaseID,
}).WithError(err).Error(
"Failed to produce block")
}
Expand All @@ -796,6 +811,7 @@ func (c *Chain) mainCycle(ctx context.Context) {
// "head_block": c.rt.getHead().Head.String(),
// "using_timestamp": t.Format(time.RFC3339Nano),
// "duration": d,
// "db": c.databaseID,
//}).Debug("main cycle")
time.Sleep(d)
} else {
Expand All @@ -810,6 +826,7 @@ func (c *Chain) sync() (err error) {
log.WithFields(log.Fields{
"peer": c.rt.getPeerInfoString(),
"time": c.rt.getChainTimeString(),
"db": c.databaseID,
}).Debug("synchronizing chain state")

for {
Expand Down Expand Up @@ -861,6 +878,7 @@ func (c *Chain) processBlocks(ctx context.Context) {
log.WithFields(log.Fields{
"height": h,
"stashs": len(stash),
"db": c.databaseID,
}).Debug("read new height from channel")
if stash != nil {
wg.Add(1)
Expand All @@ -877,6 +895,7 @@ func (c *Chain) processBlocks(ctx context.Context) {
"head_block": c.rt.getHead().Head.String(),
"block_height": height,
"block_hash": block.BlockHash().String(),
"db": c.databaseID,
}).Debug("processing new block")

if height > c.rt.getNextTurn()-1 {
Expand All @@ -896,35 +915,36 @@ func (c *Chain) processBlocks(ctx context.Context) {
"head_block": c.rt.getHead().Head.String(),
"block_height": height,
"block_hash": block.BlockHash().String(),
"db": c.databaseID,
}).WithError(err).Error("Failed to check and push new block")
} else {
head := c.rt.getHead()
currentCount := uint64(head.node.count)
if currentCount%c.updatePeriod == 0 {
ub, err := c.billing(head.node)
if err != nil {
log.WithError(err).Error("billing failed")
log.WithError(err).WithField("db", c.databaseID).Error("billing failed")
}
// allocate nonce
nonceReq := &types.NextAccountNonceReq{}
nonceResp := &types.NextAccountNonceResp{}
nonceReq.Addr = *c.addr
if err = rpc.RequestBP(route.MCCNextAccountNonce.String(), nonceReq, nonceResp); err != nil {
// allocate nonce failed
log.WithError(err).Warning("allocate nonce for transaction failed")
log.WithError(err).WithField("db", c.databaseID).Warning("allocate nonce for transaction failed")
}
ub.Nonce = nonceResp.Nonce
if err = ub.Sign(c.pk); err != nil {
log.WithError(err).Warning("sign tx failed")
log.WithError(err).WithField("db", c.databaseID).Warning("sign tx failed")
}

addTxReq := &types.AddTxReq{TTL: 1}
addTxResp := &types.AddTxResp{}
addTxReq.Tx = ub
log.Debugf("nonce in processBlocks: %d, addr: %s",
log.WithField("db", c.databaseID).Debugf("nonce in processBlocks: %d, addr: %s",
addTxReq.Tx.GetAccountNonce(), addTxReq.Tx.GetAccountAddress())
if err = rpc.RequestBP(route.MCCAddTx.String(), addTxReq, addTxResp); err != nil {
log.WithError(err).Warning("send tx failed")
log.WithError(err).WithField("db", c.databaseID).Warning("send tx failed")
}
}
}
Expand Down Expand Up @@ -957,11 +977,13 @@ func (c *Chain) Stop() (err error) {
log.WithFields(log.Fields{
"peer": c.rt.getPeerInfoString(),
"time": c.rt.getChainTimeString(),
"db": c.databaseID,
}).Debug("stopping chain")
c.rt.stop(c.databaseID)
log.WithFields(log.Fields{
"peer": c.rt.getPeerInfoString(),
"time": c.rt.getChainTimeString(),
"db": c.databaseID,
}).Debug("chain service and workers stopped")
// Close LevelDB file
var ierr error
Expand All @@ -971,13 +993,15 @@ func (c *Chain) Stop() (err error) {
log.WithFields(log.Fields{
"peer": c.rt.getPeerInfoString(),
"time": c.rt.getChainTimeString(),
"db": c.databaseID,
}).WithError(ierr).Debug("chain database closed")
if ierr = c.tdb.Close(); ierr != nil && err == nil {
err = ierr
}
log.WithFields(log.Fields{
"peer": c.rt.getPeerInfoString(),
"time": c.rt.getChainTimeString(),
"db": c.databaseID,
}).WithError(ierr).Debug("chain database closed")
// Close state
if ierr = c.st.Close(false); ierr != nil && err == nil {
Expand All @@ -986,6 +1010,7 @@ func (c *Chain) Stop() (err error) {
log.WithFields(log.Fields{
"peer": c.rt.getPeerInfoString(),
"time": c.rt.getChainTimeString(),
"db": c.databaseID,
}).WithError(ierr).Debug("chain state storage closed")
return
}
Expand Down Expand Up @@ -1035,6 +1060,7 @@ func (c *Chain) CheckAndPushNewBlock(block *types.Block) (err error) {
"blockparent": block.ParentHash().String(),
"headblock": head.Head.String(),
"headheight": head.Height,
"db": c.databaseID,
}).WithError(err).Debug("checking new block from other peer")

if head.Height == height && head.Head.IsEqual(block.BlockHash()) {
Expand Down Expand Up @@ -1067,6 +1093,7 @@ func (c *Chain) CheckAndPushNewBlock(block *types.Block) (err error) {
"time": c.rt.getChainTimeString(),
"expected": next,
"actual": index,
"db": c.databaseID,
}).WithError(err).Error(
"Failed to check new block")
return ErrInvalidProducer
Expand Down Expand Up @@ -1235,13 +1262,14 @@ func (c *Chain) stat() {
"response_header_count": rc,
"query_tracker_count": tc,
"cached_block_count": bc,
"db": c.databaseID,
}).Info("chain mem stats")
// Print xeno stats
c.st.Stat(c.databaseID)
}

func (c *Chain) billing(node *blockNode) (ub *types.UpdateBilling, err error) {
log.Debugf("begin to billing from count %d", node.count)
log.WithField("db", c.databaseID).Debugf("begin to billing from count %d", node.count)
var (
i, j uint64
minerAddr proto.AccountAddress
Expand All @@ -1260,11 +1288,11 @@ func (c *Chain) billing(node *blockNode) (ub *types.UpdateBilling, err error) {
}
for _, tx := range block.QueryTxs {
if minerAddr, err = crypto.PubKeyHash(tx.Response.Signee); err != nil {
log.WithError(err).Warning("billing fail: miner addr")
log.WithError(err).WithField("db", c.databaseID).Warning("billing fail: miner addr")
return
}
if userAddr, err = crypto.PubKeyHash(tx.Request.Header.Signee); err != nil {
log.WithError(err).Warning("billing fail: miner addr")
log.WithError(err).WithField("db", c.databaseID).Warning("billing fail: miner addr")
return
}

Expand All @@ -1282,11 +1310,11 @@ func (c *Chain) billing(node *blockNode) (ub *types.UpdateBilling, err error) {

for _, req := range block.FailedReqs {
if minerAddr, err = crypto.PubKeyHash(block.Signee()); err != nil {
log.WithError(err).Warning("billing fail: miner addr")
log.WithError(err).WithField("db", c.databaseID).Warning("billing fail: miner addr")
return
}
if userAddr, err = crypto.PubKeyHash(req.Header.Signee); err != nil {
log.WithError(err).Warning("billing fail: user addr")
log.WithError(err).WithField("db", c.databaseID).Warning("billing fail: user addr")
return
}
if _, ok := minersMap[userAddr][minerAddr]; !ok {
Expand All @@ -1306,7 +1334,7 @@ func (c *Chain) billing(node *blockNode) (ub *types.UpdateBilling, err error) {
i = 0
j = 0
for userAddr, cost := range usersMap {
log.Debugf("user %s, cost %d", userAddr.String(), cost)
log.WithField("db", c.databaseID).Debugf("user %s, cost %d", userAddr.String(), cost)
ub.Users[i] = &types.UserCost{
User: userAddr,
Cost: cost,
Expand Down
6 changes: 3 additions & 3 deletions xenomint/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,9 @@ func (s *State) ReplayBlockWithContext(ctx context.Context, block *types.Block)
s.Lock()
defer s.Unlock()
for i, q := range block.QueryTxs {
if q.Request.Header.QueryType == types.ReadQuery {
continue
}
var query = &QueryTracker{Req: q.Request, Resp: &types.Response{Header: *q.Response}}
lastsp = s.getSeq()
if q.Response.ResponseHeader.LogOffset > lastsp {
Expand All @@ -487,9 +490,6 @@ func (s *State) ReplayBlockWithContext(ctx context.Context, block *types.Block)
}
// Replay query
for j, v := range q.Request.Payload.Queries {
if q.Request.Header.QueryType == types.ReadQuery {
continue
}
if q.Request.Header.QueryType != types.WriteQuery {
err = errors.Wrapf(ErrInvalidRequest, "replay block at %d:%d", i, j)
return
Expand Down
5 changes: 4 additions & 1 deletion xenomint/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,9 +431,12 @@ INSERT INTO t1 (k, v) VALUES (?, ?)`, concat(values[2:4])...),
// Try to replay modified block #0
var blockx = &types.Block{
QueryTxs: []*types.QueryAsTx{
&types.QueryAsTx{
{
Request: &types.Request{
Header: types.SignedRequestHeader{
RequestHeader: types.RequestHeader{
QueryType: types.WriteQuery,
},
DefaultHashSignVerifierImpl: verifier.DefaultHashSignVerifierImpl{
DataHash: [32]byte{
0, 0, 0, 0, 0, 0, 0, 1,
Expand Down