diff --git a/client/group_msg.go b/client/group_msg.go index e112d068..6bd049c5 100644 --- a/client/group_msg.go +++ b/client/group_msg.go @@ -40,7 +40,7 @@ func (c *QQClient) SendGroupMessage(groupCode int64, m *message.SendingMessage, return nil } if (msgLen > 200 || imgCount > 1) && !useFram { - ret := c.sendGroupMessage(groupCode, true, + ret := c.sendGroupMessage(groupCode, false, &message.SendingMessage{Elements: []message.IMessageElement{ c.uploadGroupLongMessage(groupCode, &message.ForwardMessage{Nodes: []*message.ForwardNode{ diff --git a/client/highway.go b/client/highway.go index 79dbf1b2..3be5d630 100644 --- a/client/highway.go +++ b/client/highway.go @@ -17,6 +17,7 @@ import ( "os" "strconv" "sync" + "sync/atomic" "time" ) @@ -199,15 +200,19 @@ func (c *QQClient) highwayUploadFileMultiThreadingByBDH(path string, cmdId int32 Id int BeginOffset int64 EndOffset int64 - Uploaded bool - Uploading bool } h := md5.New() _, _ = io.Copy(h, file) fh := h.Sum(nil) - var blockSize int64 = 1024 * 512 - var blocks []*BlockMetaData - var rspExt []byte + const blockSize int64 = 1024 * 512 + var ( + blocks []*BlockMetaData + rspExt []byte + BlockId = ^uint32(0) // -1 + uploadedCount uint32 + lastErr error + cond = sync.NewCond(&sync.Mutex{}) + ) // Init Blocks { var temp int64 = 0 @@ -225,28 +230,8 @@ func (c *QQClient) highwayUploadFileMultiThreadingByBDH(path string, cmdId int32 EndOffset: stat.Size(), }) } - var nextLock sync.Mutex - var lastErr error - nextBlockId := func() int { - nextLock.Lock() - defer nextLock.Unlock() - for i, block := range blocks { - if !block.Uploading && !block.Uploaded { - block.Uploading = true - return i - } - } - return -1 - } - uploadedCount := func() (c int) { - for _, block := range blocks { - if block.Uploaded { - c++ - } - } - return - } doUpload := func() error { + defer cond.Signal() // 成功和失败都得提醒一次 conn, err := net.DialTimeout("tcp", c.srvSsoAddrs[0], time.Second*20) if err != nil { return errors.Wrap(err, "connect error") @@ -261,16 +246,17 @@ func (c *QQClient) highwayUploadFileMultiThreadingByBDH(path string, cmdId int32 return errors.Wrap(err, "echo error") } for { - nextId := nextBlockId() - if nextId == -1 { + nextId := atomic.AddUint32(&BlockId, 1) + if nextId >= uint32(len(blocks)) { break } block := blocks[nextId] if block.Id == len(blocks)-1 { - t := time.Now() - for uploadedCount() != len(blocks)-1 && lastErr == nil && time.Now().Sub(t).Seconds() < 5 { - time.Sleep(time.Millisecond * 10) + cond.L.Lock() + for atomic.LoadUint32(&uploadedCount) != uint32(len(blocks)-1) && lastErr == nil { + cond.Wait() } + cond.L.Unlock() if lastErr != nil { break } @@ -331,7 +317,7 @@ func (c *QQClient) highwayUploadFileMultiThreadingByBDH(path string, cmdId int32 if rspHead.RspExtendinfo != nil { rspExt = rspHead.RspExtendinfo } - block.Uploaded = true + atomic.AddUint32(&uploadedCount, 1) } return nil }