1
0
mirror of https://github.com/Mrs4s/MiraiGo.git synced 2025-05-05 03:23:50 +08:00

Merge pull request #93 from wdvxdr1123/patch/atomic

atomic!
This commit is contained in:
Mrs4s 2021-01-06 01:32:34 +08:00 committed by GitHub
commit 72521dec9b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 19 additions and 33 deletions

View File

@ -40,7 +40,7 @@ func (c *QQClient) SendGroupMessage(groupCode int64, m *message.SendingMessage,
return nil return nil
} }
if (msgLen > 200 || imgCount > 1) && !useFram { if (msgLen > 200 || imgCount > 1) && !useFram {
ret := c.sendGroupMessage(groupCode, true, ret := c.sendGroupMessage(groupCode, false,
&message.SendingMessage{Elements: []message.IMessageElement{ &message.SendingMessage{Elements: []message.IMessageElement{
c.uploadGroupLongMessage(groupCode, c.uploadGroupLongMessage(groupCode,
&message.ForwardMessage{Nodes: []*message.ForwardNode{ &message.ForwardMessage{Nodes: []*message.ForwardNode{

View File

@ -17,6 +17,7 @@ import (
"os" "os"
"strconv" "strconv"
"sync" "sync"
"sync/atomic"
"time" "time"
) )
@ -199,15 +200,19 @@ func (c *QQClient) highwayUploadFileMultiThreadingByBDH(path string, cmdId int32
Id int Id int
BeginOffset int64 BeginOffset int64
EndOffset int64 EndOffset int64
Uploaded bool
Uploading bool
} }
h := md5.New() h := md5.New()
_, _ = io.Copy(h, file) _, _ = io.Copy(h, file)
fh := h.Sum(nil) fh := h.Sum(nil)
var blockSize int64 = 1024 * 512 const blockSize int64 = 1024 * 512
var blocks []*BlockMetaData var (
var rspExt []byte blocks []*BlockMetaData
rspExt []byte
BlockId = ^uint32(0) // -1
uploadedCount uint32
lastErr error
cond = sync.NewCond(&sync.Mutex{})
)
// Init Blocks // Init Blocks
{ {
var temp int64 = 0 var temp int64 = 0
@ -225,28 +230,8 @@ func (c *QQClient) highwayUploadFileMultiThreadingByBDH(path string, cmdId int32
EndOffset: stat.Size(), EndOffset: stat.Size(),
}) })
} }
var nextLock sync.Mutex
var lastErr error
nextBlockId := func() int {
nextLock.Lock()
defer nextLock.Unlock()
for i, block := range blocks {
if !block.Uploading && !block.Uploaded {
block.Uploading = true
return i
}
}
return -1
}
uploadedCount := func() (c int) {
for _, block := range blocks {
if block.Uploaded {
c++
}
}
return
}
doUpload := func() error { doUpload := func() error {
defer cond.Signal() // 成功和失败都得提醒一次
conn, err := net.DialTimeout("tcp", c.srvSsoAddrs[0], time.Second*20) conn, err := net.DialTimeout("tcp", c.srvSsoAddrs[0], time.Second*20)
if err != nil { if err != nil {
return errors.Wrap(err, "connect error") return errors.Wrap(err, "connect error")
@ -261,16 +246,17 @@ func (c *QQClient) highwayUploadFileMultiThreadingByBDH(path string, cmdId int32
return errors.Wrap(err, "echo error") return errors.Wrap(err, "echo error")
} }
for { for {
nextId := nextBlockId() nextId := atomic.AddUint32(&BlockId, 1)
if nextId == -1 { if nextId >= uint32(len(blocks)) {
break break
} }
block := blocks[nextId] block := blocks[nextId]
if block.Id == len(blocks)-1 { if block.Id == len(blocks)-1 {
t := time.Now() cond.L.Lock()
for uploadedCount() != len(blocks)-1 && lastErr == nil && time.Now().Sub(t).Seconds() < 5 { for atomic.LoadUint32(&uploadedCount) != uint32(len(blocks)-1) && lastErr == nil {
time.Sleep(time.Millisecond * 10) cond.Wait()
} }
cond.L.Unlock()
if lastErr != nil { if lastErr != nil {
break break
} }
@ -331,7 +317,7 @@ func (c *QQClient) highwayUploadFileMultiThreadingByBDH(path string, cmdId int32
if rspHead.RspExtendinfo != nil { if rspHead.RspExtendinfo != nil {
rspExt = rspHead.RspExtendinfo rspExt = rspHead.RspExtendinfo
} }
block.Uploaded = true atomic.AddUint32(&uploadedCount, 1)
} }
return nil return nil
} }