• 如果您觉得本站非常有看点,那么赶紧使用Ctrl+D 收藏吧

并发性:限制goroutines不能按预期工作

go 来源:ma3x 5次浏览

我正在处理搜索引擎项目。为了更快的抓取速度,我使用每个链接访问一个goroutine。但是我遇到了两个问题,让我感到惊讶!并发性:限制goroutines不能按预期工作

第一个是一个代码示例:

package main 

import "fmt" 
import "sync" 
import "time" 

type test struct { 
    running int 
    max  int 
    mu  sync.Mutex 
} 

func main() { 
    t := &test{max: 1000} 
    t.start() 
} 

func (t *test) start() { 
    for { 
     if t.running >= t.max { 
      time.Sleep(200 * time.Millisecond) 
      continue 
     } 
     go t.visit() 
    } 
} 

func (t *test) visit() { 
    t.inc() 
    defer t.dec() 
    fmt.Println("visit called") 
    fmt.Printf("running: %d, max: %d\n", t.running, t.max) 
    fmt.Println() 
    time.Sleep(time.Second) 
} 

func (t *test) inc() { 
    t.mu.Lock() 
    t.running++ 
    t.mu.Unlock() 
} 
func (t *test) dec() { 
    t.mu.Lock() 
    t.running-- 
    t.mu.Unlock() 
} 

输出(裁剪):

running: 2485, max: 1000 

running: 2485, max: 1000 

running: 2485, max: 1000 

visit called 
running: 2485, max: 1000 

running: 2485, max: 1000 

running: 2485, max: 1000 

running: 2485, max: 1000 


visit called 
running: 2485, max: 1000 


running: 2485, max: 1000 

虽然我明确地检查循环中最大允许够程,为什么跑够程超过最大?


第二个是真正的项目代码的一部分:

UPDATE:这实际上是固定的,问题是在LinkProvider.Get()实现,时间太长,返回。 parser.visit()同时返回,但Parser.Start()中的循环正在等待新链接…并且输出看起来是连续的!

package worker 

import (
    "errors" 
    "fmt" 
    "sync" 
    "time" 

    "bitbucket.org/codictive/ise/components/crawler/models" 
    "bitbucket.org/codictive/ise/components/log/logger" 
    "bitbucket.org/codictive/ise/core/component" 
    "bitbucket.org/codictive/ise/core/database" 
) 

// Worker is a service that processes crawlable links. 
type Worker interface { 
    Start() error 
    Stop() error 
    Restart() error 
    Status() Status 
} 

// Status contains runtime status of a worker. 
type Status struct { 
    Running    bool 
    RunningParsersCount int 
} 

// New return a new defaultWorker with given config. 
func New() Worker { 
    return &defaultWorker{ 
     flow: make(chan bool), 
     stop: make(chan bool), 
    } 
} 

// defaultWorker is a Worker implementation. 
type defaultWorker struct { 
    linkProvider   LinkProvider 
    handlersLimit  int 
    runningHandlersCount int 
    running    bool 
    mu     sync.Mutex 
    flow     chan bool 
    stop     chan bool 
} 

func (w *defaultWorker) init() { 
    prate, _ := component.IntConfig("crawler.crawlInterval") 
    arate, _ := component.IntConfig("crawler.ad_crawlInterval") 
    concLimit, _ := component.IntConfig("crawler.concurrent_workers_limit") 
    w.linkProvider = NewLinkProvider(time.Duration(prate)*time.Hour, time.Duration(arate)*time.Hour) 
    w.handlersLimit = concLimit 
} 

// Start runs worker. 
func (w *defaultWorker) Start() error { 
    logger.Info("Starting crawler worker...") 
    w.running = true 
    w.init() 

    defer func() { 
     w.running = false 
     logger.Info("Worker stopped.") 
    }() 

    for { 
     select { 
     case <-w.stop: 
      w.flow <- true 
      return nil 
     default: 
      fmt.Printf("running: %d limit: %d\n", w.runningHandlersCount, w.handlersLimit) 
      if w.runningHandlersCount >= w.handlersLimit { 
       time.Sleep(200 * time.Millisecond) 
       continue 
      } 

      link := w.linkProvider.Get() 
      if link.ID == 0 { 
       logger.Debug("no link to crawl") 
       time.Sleep(time.Minute) 
       continue 
      } 

      go func(l *models.CrawlLink) { 
       go w.visit(l) 
      }(link) 
     } 
    } 
} 

// Stop stops worker. 
func (w *defaultWorker) Stop() error { 
    logger.Info("Stopping crawler worker...") 
    w.stop <- true 
    select { 
    case <-w.flow: 
     return nil 
    case <-time.After(2 * time.Minute): 
     return errors.New("worker did not stopped properly") 
    } 
} 

// Restart re-starts worker. 
func (w *defaultWorker) Restart() error { 
    logger.Info("Re-starting crawler worker...") 
    w.stop <- true 
    select { 
    case <-w.flow: 
     return w.Start() 
    case <-time.After(2 * time.Minute): 
     return errors.New("can not restart worker") 
    } 
} 

// Status reports current worker status. 
func (w *defaultWorker) Status() Status { 
    return Status{ 
     Running:    w.running, 
     RunningParsersCount: w.runningHandlersCount, 
    } 
} 

func (w *defaultWorker) visit(cl *models.CrawlLink) { 
    w.incrementRunningWorkers() 
    defer w.decrementRunningWorkers() 

    if cl == nil { 
     logger.Warning("[crawler.worker.visit] Can not visit a nil link.") 
     return 
    } 
    if err := cl.LoadFull(); err != nil { 
     logger.Error("[crawler.worker.visit] Can not load link relations. (%v)", err) 
     return 
    } 

    parser := NewParser(cl) 
    if parser == nil { 
     logger.Error("[crawler.worker.visit] Parser instantiation failed.") 
     return 
    } 
    before := time.Now() 
    if err := parser.Parse(); err != nil { 
     cl.Error = err.Error() 
     logger.Error("[crawler.worker.visit] Parser finished with error: %v.", err) 
     db := database.Open() 
     if err := db.Save(&cl).Error; err != nil { 
      logger.Error("[crawler.worker.visit] can not update crawl link. (%v)", err) 
     } 
    } 
    logger.Debug("[crawler.worker.visit] Parsing %q took %s.", cl.URL, time.Since(before)) 
    fmt.Printf("[crawler.worker.visit] Parsing %q took %s.\n", cl.URL, time.Since(before)) 
} 

func (w *defaultWorker) incrementRunningWorkers() { 
    w.mu.Lock() 
    w.runningHandlersCount++ 
    w.mu.Unlock() 
    fmt.Printf("increment called. current: %d\n", w.runningHandlersCount) 
} 

func (w *defaultWorker) decrementRunningWorkers() { 
    w.mu.Lock() 
    w.runningHandlersCount-- 
    w.mu.Unlock() 
    fmt.Printf("decrement called. current: %d\n", w.runningHandlersCount) 
} 

输出:

2017/12/03 11:24:36 profile: cpu profiling enabled, /var/folders/1x/01d32mrs2plcj9pnb3mnnrhw0000gn/T/profile924798503/cpu.pprof 
running: 0 limit: 1000 
Running server on :8080 
running: 0 limit: 1000 
increment called. current: 1 
[crawler.worker.visit] Parsing "https://www.sheypoor.com/%D9%81%D8%B1%D8%A7%D8%B4%D8%A8%D9%86%D8%AF/%D8%A7%D9%85%D9%84%D8%A7%DA%A9/%D9%81%D8%B1%D9%88%D8%B4-%D8%A7%D8%AF%D8%A7%D8%B1%DB%8C-%D9%88-%D8%AA%D8%AC%D8%A7%D8%B1%DB%8C" took 370.140513ms. 
decrement called. current: 0 
running: 0 limit: 1000 
increment called. current: 1 
[crawler.worker.visit] Parsing "https://www.sheypoor.com/%D8%B3%D8%A7%D9%85%D8%B3%D9%88%D9%86%DA%AF-s3-neo-24252682.html" took 193.193357ms. 
decrement called. current: 0 
running: 0 limit: 1000 
increment called. current: 1 
[crawler.worker.visit] Parsing "https://www.sheypoor.com/%D9%85%DB%8C%D8%B2%D9%88%D8%B5%D9%86%D8%AF%D9%84%DB%8C-%D8%AA%D8%A7%D9%84%D8%A7%D8%B1-22399505.html" took 201.636741ms. 
decrement called. current: 0 
running: 0 limit: 1000 
increment called. current: 1 
[crawler.worker.visit] Parsing "https://www.sheypoor.com/50000%D9%85%D8%AA%D8%B1-%D8%B2%D9%85%DB%8C%D9%86-%D9%85%D8%B1%D8%BA%D8%AF%D8%A7%D8%B1%DB%8C-%D9%88%D8%A7%D9%82%D8%B9-%D8%AF%D8%B1-%D8%AE%D8%B1%D9%85%D8%AF%D8%B1%D9%87-23075331.html" took 210.360596ms. 
decrement called. current: 0 
^C2017/12/03 11:24:43 profile: caught interrupt, stopping profiles 
2017/12/03 11:24:43 profile: cpu profiling disabled, /var/folders/1x/01d32mrs2plcj9pnb3mnnrhw0000gn/T/profile924798503/cpu.pprof 

正如你可以看到visit方法运行完全顺序!无论我是只用go visit(link)还是上面的代码中使用的名称。 为什么会发生这种情况?什么是阻止循环迭代?


===========解决方案如下:

我会使用渠道和拦截功能解决了这个问题 – https://play.golang.org/p/KbYOI1oGNs

主要的变化是,我们有一个通道guard,我们把有新项目时够程启动(如果大小达到限制它会阻止) ,完成后释放。

func (t *test) start() { 
    maxGoroutines := t.max 
    guard := make(chan struct{}, maxGoroutines) 

    for { 
     guard <- struct{}{} 
     go func() { 
      t.visit() 
      <-guard 
     }() 
    } 
} 

版权声明:本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系管理员进行删除。
喜欢 (0)