Golang 弹幕服务器优化之一 调度器

在go里的goroutine的调度实现的是 M:N 模型, g、p、m 三者调度逻辑非常复杂涉及到抢占、解绑等等细节,这里就不展开讨论现细节。

package main
func main() int {
    return main;
}

编译之后汇编结果为

TEXT    "".main+0(SB),$0-0
MOVQ    (TLS),CX
CMPQ    SP,(CX)
JHI    ,16
CALL    ,runtime.morestack00_noctxt(SB)

函数在运行过程中栈大小可能出现不够用的情况,G被调度时会将 stackguard0 与 sp 寄存器进行比较,如果已经超过原来的大小说明栈需要扩张了。这意味着每次函数调用,确切的说是goroutine的调度都有可能发生栈扩张。

// morestack trampolines
TEXT runtime·morestack_noctxt(SB),NOSPLIT,$0
    MOVL    $0, DX
    JMP    runtime·morestack(SB)

在runtime里面有一段汇编,初始化m中新栈,保存当前栈以及f、返回地址、函数调用地址等等最后调用newstack。

TEXT runtime·morestack(SB),NOSPLIT,$0-0
    get_tls(CX)
    MOVL    g(CX), BX
    MOVL    g_m(BX), BX

    // Cannot grow scheduler stack (m->g0).
    MOVL    m_g0(BX), SI
    CMPL    g(CX), SI
    JNE    3(PC)
    CALL    runtime·badmorestackg0(SB)
    MOVL    0, AX

    // Cannot grow signal stack (m->gsignal).
    MOVL    m_gsignal(BX), SI
    CMPL    g(CX), SI
    JNE    3(PC)
    CALL    runtime·badmorestackgsignal(SB)
    MOVL    0, AX

    // Called from f.
    // Set m->morebuf to f's caller.
    MOVL    8(SP), AX    // f's caller's PC
    MOVL    AX, (m_morebuf+gobuf_pc)(BX)
    LEAL    16(SP), AX    // f's caller's SP
    MOVL    AX, (m_morebuf+gobuf_sp)(BX)
    get_tls(CX)
    MOVL    g(CX), SI
    MOVL    SI, (m_morebuf+gobuf_g)(BX)

    // Set g->sched to context in f.
    MOVL    0(SP), AX // f's PC
    MOVL    AX, (g_sched+gobuf_pc)(SI)
    MOVL    SI, (g_sched+gobuf_g)(SI)
    LEAL    8(SP), AX // f's SP
    MOVL    AX, (g_sched+gobuf_sp)(SI)
    MOVL    DX, (g_sched+gobuf_ctxt)(SI)

    // Call newstack on m->g0's stack.
    MOVL    m_g0(BX), BX
    MOVL    BX, g(CX)
    MOVL    (g_sched+gobuf_sp)(BX), SP
    CALL    runtime·newstack(SB)
    MOVL    $0, 0x1003    // crash if newstack returns
    RET

// morestack trampolines
TEXT runtime·morestack_noctxt(SB),NOSPLIT,$0
    MOVL    $0, DX
    JMP    runtime·morestack(SB)

最后调用newsatck复制旧栈的到新栈,做抢占调度以及调度相关的标记,以及GC相关的工作(这一块我还没来得及看)。

// Called from runtime·morestack when more stack is needed.
// Allocate larger stack and relocate to new stack.
// Stack growth is multiplicative, for constant amortized cost.
//
// g->atomicstatus will be Grunning or Gscanrunning upon entry.
// If the GC is trying to stop this g then it will set preemptscan to true.
//
// This must be nowritebarrierrec because it can be called as part of
// stack growth from other nowritebarrierrec functions, but the
// compiler doesn't check this.
//
//go:nowritebarrierrec
func newstack() {
    thisg := getg()
    // TODO: double check all gp. shouldn't be getg().
    if thisg.m.morebuf.g.ptr().stackguard0 == stackFork {
        throw("stack growth after fork")
    }
    if thisg.m.morebuf.g.ptr() != thisg.m.curg {
        print("runtime: newstack called from g=", hex(thisg.m.morebuf.g), "\n"+"\tm=", thisg.m, " m->curg=", thisg.m.curg, " m->g0=", thisg.m.g0, " m->gsignal=", thisg.m.gsignal, "\n")
        morebuf := thisg.m.morebuf
        traceback(morebuf.pc, morebuf.sp, morebuf.lr, morebuf.g.ptr())
        throw("runtime: wrong goroutine in newstack")
    }

    gp := thisg.m.curg

    if thisg.m.curg.throwsplit {
        // Update syscallsp, syscallpc in case traceback uses them.
        morebuf := thisg.m.morebuf
        gp.syscallsp = morebuf.sp
        gp.syscallpc = morebuf.pc
        pcname, pcoff := "(unknown)", uintptr(0)
        f := findfunc(gp.sched.pc)
        if f.valid() {
            pcname = funcname(f)
            pcoff = gp.sched.pc - f.entry
        }
        print("runtime: newstack at ", pcname, "+", hex(pcoff),
            " sp=", hex(gp.sched.sp), " stack=[", hex(gp.stack.lo), ", ", hex(gp.stack.hi), "]\n",
            "\tmorebuf={pc:", hex(morebuf.pc), " sp:", hex(morebuf.sp), " lr:", hex(morebuf.lr), "}\n",
            "\tsched={pc:", hex(gp.sched.pc), " sp:", hex(gp.sched.sp), " lr:", hex(gp.sched.lr), " ctxt:", gp.sched.ctxt, "}\n")

        thisg.m.traceback = 2 // Include runtime frames
        traceback(morebuf.pc, morebuf.sp, morebuf.lr, gp)
        throw("runtime: stack split at bad time")
    }

    morebuf := thisg.m.morebuf
    thisg.m.morebuf.pc = 0
    thisg.m.morebuf.lr = 0
    thisg.m.morebuf.sp = 0
    thisg.m.morebuf.g = 0

    // NOTE: stackguard0 may change underfoot, if another thread
    // is about to try to preempt gp. Read it just once and use that same
    // value now and below.
    preempt := atomic.Loaduintptr(&gp.stackguard0) == stackPreempt

    // Be conservative about where we preempt.
    // We are interested in preempting user Go code, not runtime code.
    // If we're holding locks, mallocing, or preemption is disabled, don't
    // preempt.
    // This check is very early in newstack so that even the status change
    // from Grunning to Gwaiting and back doesn't happen in this case.
    // That status change by itself can be viewed as a small preemption,
    // because the GC might change Gwaiting to Gscanwaiting, and then
    // this goroutine has to wait for the GC to finish before continuing.
    // If the GC is in some way dependent on this goroutine (for example,
    // it needs a lock held by the goroutine), that small preemption turns
    // into a real deadlock.
    if preempt {
        if thisg.m.locks != 0 || thisg.m.mallocing != 0 || thisg.m.preemptoff != "" || thisg.m.p.ptr().status != _Prunning {
            // Let the goroutine keep running for now.
            // gp->preempt is set, so it will be preempted next time.
            gp.stackguard0 = gp.stack.lo + _StackGuard
            gogo(&gp.sched) // never return
        }
    }

    if gp.stack.lo == 0 {
        throw("missing stack in newstack")
    }
    sp := gp.sched.sp
    if sys.ArchFamily == sys.AMD64 || sys.ArchFamily == sys.I386 || sys.ArchFamily == sys.WASM {
        // The call to morestack cost a word.
        sp -= sys.PtrSize
    }
    if stackDebug >= 1 || sp < gp.stack.lo {
        print("runtime: newstack sp=", hex(sp), " stack=[", hex(gp.stack.lo), ", ", hex(gp.stack.hi), "]\n",
            "\tmorebuf={pc:", hex(morebuf.pc), " sp:", hex(morebuf.sp), " lr:", hex(morebuf.lr), "}\n",
            "\tsched={pc:", hex(gp.sched.pc), " sp:", hex(gp.sched.sp), " lr:", hex(gp.sched.lr), " ctxt:", gp.sched.ctxt, "}\n")
    }
    if sp < gp.stack.lo {
        print("runtime: gp=", gp, ", goid=", gp.goid, ", gp->status=", hex(readgstatus(gp)), "\n ")
        print("runtime: split stack overflow: ", hex(sp), " < ", hex(gp.stack.lo), "\n")
        throw("runtime: split stack overflow")
    }

    if preempt {
        if gp == thisg.m.g0 {
            throw("runtime: preempt g0")
        }
        if thisg.m.p == 0 && thisg.m.locks == 0 {
            throw("runtime: g is running but p is not")
        }
        // Synchronize with scang.
        casgstatus(gp, _Grunning, _Gwaiting)
        if gp.preemptscan {
            for !castogscanstatus(gp, _Gwaiting, _Gscanwaiting) {
                // Likely to be racing with the GC as
                // it sees a _Gwaiting and does the
                // stack scan. If so, gcworkdone will
                // be set and gcphasework will simply
                // return.
            }
            if !gp.gcscandone {
                // gcw is safe because we're on the
                // system stack.
                gcw := &gp.m.p.ptr().gcw
                scanstack(gp, gcw)
                gp.gcscandone = true
            }
            gp.preemptscan = false
            gp.preempt = false
            casfrom_Gscanstatus(gp, _Gscanwaiting, _Gwaiting)
            // This clears gcscanvalid.
            casgstatus(gp, _Gwaiting, _Grunning)
            gp.stackguard0 = gp.stack.lo + _StackGuard
            gogo(&gp.sched) // never return
        }

        // Act like goroutine called runtime.Gosched.
        casgstatus(gp, _Gwaiting, _Grunning)
        gopreempt_m(gp) // never return
    }

    // Allocate a bigger segment and move the stack.
    oldsize := gp.stack.hi - gp.stack.lo
    newsize := oldsize * 2
    if newsize > maxstacksize {
        print("runtime: goroutine stack exceeds ", maxstacksize, "-byte limit\n")
        throw("stack overflow")
    }

    // The goroutine must be executing in order to call newstack,
    // so it must be Grunning (or Gscanrunning).
    casgstatus(gp, _Grunning, _Gcopystack)

    // The concurrent GC will not scan the stack while we are doing the copy since
    // the gp is in a Gcopystack status.
    copystack(gp, newsize, true)
    if stackDebug >= 1 {
        print("stack grow done\n")
    }
    casgstatus(gp, _Gcopystack, _Grunning)
    gogo(&gp.sched)
}

同样的,当使用栈大小小于一定的值golang又会执行栈的收缩。试想如果上万甚至百万的goroutine都会频繁的在调整栈大小,那么会对运行的程序造成多大的性能开销?特别是对于高并发或者的场景(一般的API调用不算),这个结果在我们弹幕服务器高并发下生成的火焰图中得到了验证,在runtime里面有22%的时间都浪费在做栈的调整!???通过修改用户请求处理方式,根据经验数据事先为每个用户申请3k * 2 buff,并且通过leaky_buffer重用这些goroutine。通过优化,我把栈的调整的时间优化到几乎可以忽略不计。

总结:golang给我们提供了goroutine这种黑魔法的同时也带来了一些隐含的性能问题,每一个合格的golang开发者都应该了解调度器是如何工作的:)

添加新评论