如果golang程序想监听文件系统中某些文件的变化, 那么最普遍的做法是使用fsnotify库. 起初是由Chris Howey(github account: howeyc)开发的 , 后来受到广大开发者的喜爱, 遂单独建立仓库. 至今为止, 其 仓库 已收到了5.9k star, 这足以证明其受欢迎程度. 想了解更多关于fsnotify的历史, 可以查看 官网 .

以下源码分析基于的git commit版本为: 7f4cf4dd2b522a984eaca51d1ccee54101d3414a

1. 代码统计

使用cloc工具进行源码统计, cloc --by-file-by-lang --exclude-dir=.github --exclude-lang=YAML,Markdown [project-dir] , 结果如下(省略yaml等标记型语言相关统计):

File blank comment code
./integration_test.go 188 126 923
./inotify_test.go 69 28 358
./inotify_poller_test.go 29 10 190
./integration_darwin_test.go 31 31 105
./fsnotify_test.go 11 8 51
./windows.go 42 31 488
./kqueue.go 73 77 371
./inotify.go 45 66 226
./inotify_poller.go 16 33 138
./fsnotify.go 10 12 46
./fen.go 8 9 20
./open_mode_bsd.go 4 4 3
./open_mode_darwin.go 4 5 3
SUM: 530 440 2922

fsnotify的go代码总行数为2922行, 其中测试类代码占1627(=923+358+190+105+51)行, 实际有效代码只有1295行. 如此少的代码还支持了windows/linux/mac平台, 由此可见, 算是一个比较精简的库了.

2. 使用示例

为了先对代码有一个感性的认识, 我们以官方的示例作为开头, 代码如下:

package main
import (
    "log"
    "github.com/fsnotify/fsnotify"
func main() {
    watcher, err := fsnotify.NewWatcher() // 初始化一个空的watcher
    if err != nil {
        log.Fatal(err)
    defer watcher.Close() // 最后结束程序时关闭watcher
    done := make(chan bool)
    go func() { // 启动一个协程来单独处理watcher发来的事件
        for {
            select {
            case event, ok := <-watcher.Events: // 正常的事件的处理逻辑
                if !ok {
                    return
                log.Println("event:", event)
                if event.Op&fsnotify.Write == fsnotify.Write {
                    log.Println("modified file:", event.Name)
            case err, ok := <-watcher.Errors: // 发生错误时的处理逻辑
                if !ok {
                    return
                log.Println("error:", err)
    err = watcher.Add("/tmp/foo") // 使watcher监控/tmp/foo
    if err != nil {
        log.Fatal(err)
    <-done // 使主协程不退出

用法非常的简单:

  • 初始化一个空的fsnotify watcher
  • 写一个协程用来处理watcher推送的事件
  • 告诉watcher需要监听的文件或目录
  • 3. 构建约束

    fsnotify是一个跨平台的库, 源码中既包含了linux平台的实现逻辑, 也包含了mac平台和windows平台的实现逻辑, 此时问题就来了:

    开发者在引用了此库后, 如何才能保证编译出来的可执行文件, 只包含对应的目标平台的实现, 而不包含无关平台的实现呢? 比如开发者的编译目标平台是linux, 编译时如何去除mac和windows等无关平台的实现代码呢?

    好在golang为我们提供了构建约束(Build Constraints), 大概使用方法如下:

    // +build linux,386 darwin,!cgo
    

    上面这条注释不是普通的注释, 而是构建约束, 把它写在代码文件的顶部(package声明的上面), 会被编译器在编译时按照目标平台来判断是否编译进可执行文件中. 上面这行构建约束的意思是(linux AND 386) OR (darwin AND (NOT cgo)).

    好了, 了解了构建约束的用法, 我们看fsnotify的源码时就可以根据自己所关心的平台来详细阅读其实现.

    4. 详细解读--linux部分

    用的最多的当属linux实现部分了, 其底层是基于linux的inotify机制, 相关逻辑就在库中的inotify.go文件中.

    a. 总体思路

    按照前面使用示例的步骤, 第一步是watcher, err := fsnotify.NewWatcher(), 那么我们就来看看这里new的watcher都包含什么, 代码如下:

    // NewWatcher establishes a new watcher with the underlying OS and begins waiting for events.
    func NewWatcher() (*Watcher, error) {
        // Create inotify fd
        fd, errno := unix.InotifyInit1(unix.IN_CLOEXEC)
        if fd == -1 {
            return nil, errno
        // Create epoll
        poller, err := newFdPoller(fd)
        if err != nil {
            unix.Close(fd)
            return nil, err
        w := &Watcher{
            fd:       fd,
            poller:   poller,
            watches:  make(map[string]*watch),
            paths:    make(map[int]string),
            Events:   make(chan Event),
            Errors:   make(chan error),
            done:     make(chan struct{}),
            doneResp: make(chan struct{}),
        go w.readEvents()
        return w, nil
    

    上面代码的总体思路:

    建立一个inotify实例

    inotify实例会以一个文件描述符(fd)的形式返回给调用者, 一旦有我们watch的文件发生变化, 就能从这个fd里读到相应的事件. 但是问题是这个文件描述符需要我们自己去读取, 所以我们就需要有某种轮训机制, 就引出下面的epoll注册的用处.

    使用epoll监听实例上的事件

    把这个fd注册到epoll上, 在fd上有数据到达时, epoll就能立刻收到并返回给我们.

    初始化各种的状态上下文, 如: watches用来存放watch对象, event用来推送事件

    启动监听协程

    b. 事件监听协程

    上面的代码最后启动了一个监听协程go w.readEvents(), 我们就来看看这里发生了什么, 代码如下:

    为使篇幅简练, 省略冗余代码

    func (w *Watcher) readEvents() {
        var (...) // 各种变量
        defer close(...) // 关闭上下文的各种资源
        for {
            if w.isClosed() { return }
            ok, errno = w.poller.wait() // 程序阻塞在这行, 直到epoll监听到相关事件为止
            if ... { continue } // 各种error处理逻辑
            n, errno = unix.Read(w.fd, buf[:]) // 走到这里的话就是有事件发生, 所以这里读出事件到buffer里, 放到下面处理
            if ... { continue } // 各种error处理逻辑
            if n < unix.SizeofInotifyEvent { // 当读到的事件小于16字节(一个事件结构体的单位大小), 异常处理逻辑
                continue
            var offset uint32
            // 此时我们也不知道读了几个事件到buffer里
            // 所以我们就用offset记录下当前所读到的位置偏移量, 直到读完为止
            // 这个for循环结束条件是: offset累加到了某个值, 以至于剩余字节数不够读取出一整个inotify event结构体
            for offset <= uint32(n-unix.SizeofInotifyEvent) {
                // 强制把地址值转换成inotify结构体
                raw := (*unix.InotifyEvent)(unsafe.Pointer(&buf[offset]))
                mask := uint32(raw.Mask) // 所发生的事件以掩码形式表示
                nameLen := uint32(raw.Len) // 当监听的是个目录时, 目录中发生事件的文件名会包含在结构体中, 这里的len就是文件名的长度
                if mask&unix.IN_Q_OVERFLOW != 0 { ... } // mask格式错误, 向Errors chan发送事件
                w.mu.Lock() // 由于可能会对上下文进行删除操作, 所以锁住
                // Wd是我们所watch的, 并且此次发生事件了的文件描述符
                // 我们可以从构建好的上下文中取出这个文件描述符所对应的文件名
                name, ok := w.paths[int(raw.Wd)] 
                // 如果发生删除事件, 也一并在上下文中删掉这个文件名
                if ok && mask&unix.IN_DELETE_SELF == unix.IN_DELETE_SELF {
                    delete(w.paths, int(raw.Wd))
                    delete(w.watches, name)
                w.mu.Unlock() // 解锁
                if nameLen > 0 { // 当我们watch是一个目录的时候, 其下面的文件发生事件时, 就会导致这个nameLen大于0
                    // 此时读取文件名字(文件名就在inotify event结构体的后面), 强制把地址值转换成长度4096的byte数组
                    bytes := (*[unix.PathMax]byte)(unsafe.Pointer(&buf[offset+unix.SizeofInotifyEvent]))[:nameLen:nameLen]
                    // 拼接路径(文件名会以\000为结尾表示, 所以要去掉)
                    name += "/" + strings.TrimRight(string(bytes[0:nameLen]), "\000")
                event := newEvent(name, mask) // 生成一个event
                if !event.ignoreLinux(mask) { // 如果这个事件没有被忽略, 那么发送到Events chan
                    select {
                    case w.Events <- event:
                    case <-w.done:
                        return
                // 移动offset偏移量到下个inotify event结构体
                offset += unix.SizeofInotifyEvent + nameLen
    

    c. 添加watch路径

    我们通过err = watcher.Add("/tmp/foo")来让watcher去watch路径/tmp/foo, Add方法就是在inotify里注册路径, 代码如下:

    // Add starts watching the named file or directory (non-recursively).
    func (w *Watcher) Add(name string) error {
        name = filepath.Clean(name) // 获取标准路径, 如/tmp//////too经过Clean后就成了/tmp/too
        if w.isClosed() {
            return errors.New("inotify instance already closed")
        const agnosticEvents = unix.IN_MOVED_TO | unix.IN_MOVED_FROM |
            unix.IN_CREATE | unix.IN_ATTRIB | unix.IN_MODIFY |
            unix.IN_MOVE_SELF | unix.IN_DELETE | unix.IN_DELETE_SELF
        var flags uint32 = agnosticEvents
        w.mu.Lock()
        defer w.mu.Unlock()
      watchEntry := w.watches[name] // 取出上下文里的watch路径(如果存在的话)
        if watchEntry != nil {
            flags |= watchEntry.flags | unix.IN_MASK_ADD
        wd, errno := unix.InotifyAddWatch(w.fd, name, flags) // 添加watch路径
        if wd == -1 {
            return errno
        if watchEntry == nil { // 如果上下文里不存在此路径, 表明这是一个新的watch, 添加到上下文
            w.watches[name] = &watch{wd: uint32(wd), flags: flags}
            w.paths[wd] = name
        } else { // 如果在上下文中存在, 则更新上下文
            watchEntry.wd = uint32(wd)
            watchEntry.flags = flags
        return nil
    

    d. 删除watch路径

    // Remove stops watching the named file or directory (non-recursively).
    func (w *Watcher) Remove(name string) error {
        name = filepath.Clean(name) // 获取标准路径
        // 涉及到多协程可能同时对同一个watch项写, 所以锁住
        w.mu.Lock()
        defer w.mu.Unlock() // 最后解锁
        watch, ok := w.watches[name]
        if ... { ... } // 错误处理
        // 删除上下文里的相应watch项
        delete(w.paths, int(watch.wd))
        delete(w.watches, name)
        // 删除inotify中watch的fd
        success, errno := unix.InotifyRmWatch(w.fd, watch.wd)
        if ... { ... } // 错误处理
        return nil
    

    e. poller部分(基于epoll)

    我们上面看到在func NewWatcher() (*Watcher, error)函数中调用了poller, err := newFdPoller(fd), 这是将inotify的fd注册在epoll上, 以实现高效的fs监听, 代码如下:

    为使篇幅简练, 省略冗余代码

    func newFdPoller(fd int) (*fdPoller, error) {
        var errno error
        poller := emptyPoller(fd)
        defer func() {
            if errno != nil {
                poller.close()
        poller.fd = fd
        // 要使用epoll的话, 需要使用EpollCreate函数为其单独创建一个fd
        poller.epfd, errno = unix.EpollCreate1(unix.EPOLL_CLOEXEC)
        if ... { return ... } // error处理
        // 为实现优雅退出, 需要创建一个管道, pipe[0]用来读, pipe[1]用来写
      // 在介绍watcher的Close函数时会分析这部分的逻辑
        errno = unix.Pipe2(poller.pipe[:], unix.O_NONBLOCK|unix.O_CLOEXEC)
        if ... { return ... } // error处理
        // 注册inotify的fd到epoll
        event := unix.EpollEvent{
            Fd:     int32(poller.fd),
            Events: unix.EPOLLIN,
        errno = unix.EpollCtl(poller.epfd, unix.EPOLL_CTL_ADD, poller.fd, &event)
        if ... { return ... } // error处理
        // 注册管道的fd到epoll
        event = unix.EpollEvent{
            Fd:     int32(poller.pipe[0]),
            Events: unix.EPOLLIN,
        errno = unix.EpollCtl(poller.epfd, unix.EPOLL_CTL_ADD, poller.pipe[0], &event)
        if ... { return ... } // error处理
        return poller, nil
    

    函数func newFdPoller(fd int) (*fdPoller, error)在epoll的fd上注册了两个文件, 一个是inotify的, 另一个是其用来实现优雅退出的pipe[0].

    我们在上面的*事件监听协程 func (w *Watcher) readEvents()*小节中提到的ok, errno = w.poller.wait()语句阻塞直到收到事件才会返回, 来看看具体poller(也就是上面的epoll)对事件的处理逻辑, 代码如下:

    为使篇幅简练, 省略冗余代码

    func (poller *fdPoller) wait() (bool, error) {
        // 总共监听两个fd: 1.inotify 2.优雅退出所需的pipe[0]
        // 每个fd有三种可能的事件, 所以最多可以触发6个事件
        // 准备一个大于6的slice
        events := make([]unix.EpollEvent, 7)
        for {
            // 阻塞wait在epoll的fd上, 参数-1表示不会超时
            // 一旦有事件产生, 就会发到events里
            n, errno := unix.EpollWait(poller.epfd, events, -1) 
            if ... { ... } // 各种异常处理
            // 以下就是收到正常事件的处理
            ready := events[:n]
            epollhup := false
            epollerr := false
            epollin := false
            for _, event := range ready {
                if event.Fd == int32(poller.fd) {
                    if event.Events&unix.EPOLLHUP != 0 {
                        // This should not happen, but if it does, treat it as a wakeup.
                        epollhup = true
                    if event.Events&unix.EPOLLERR != 0 {
                        // If an error is waiting on the file descriptor, we should pretend
                        // something is ready to read, and let unix.Read pick up the error.
                        epollerr = true
                    if event.Events&unix.EPOLLIN != 0 {
                        // inotify有事件
                        epollin = true
                if event.Fd == int32(poller.pipe[0]) {
                    if event.Events&unix.EPOLLHUP != 0 {
                        // Write pipe descriptor was closed, by us. This means we're closing down the
                        // watcher, and we should wake up.
                    if event.Events&unix.EPOLLERR != 0 {
                        // If an error is waiting on the pipe file descriptor.
                        // This is an absolute mystery, and should never ever happen.
                        return false, errors.New("Error on the pipe descriptor.")
                    if event.Events&unix.EPOLLIN != 0 {
                        // 收到程序发来的优雅退出事件, 将调用clearWake以使管道排空
                        err := poller.clearWake()
                        if err != nil {
                            return false, err
            if epollhup || epollerr || epollin {
                return true, nil
            return false, nil
    

    clearWake函数, 代码如下

    func (poller *fdPoller) clearWake() error {
       // You have to be woken up a LOT in order to get to 100!
       buf := make([]byte, 100)
       n, errno := unix.Read(poller.pipe[0], buf) // 读取pipe[0]中的退出信号
       if ... { ... } // 错误处理
       return nil
    

    那么pipe[0]中的信号是怎么来的呢? 也就是说必须有一个地方往pipe[1]中写数据. 其实, 我们示例代码中采用defer方式调用了watcher.Close()函数, 而其最重要的一步就是调用w.poller.wake()函数, 代码如下:

    为使篇幅简练, 省略冗余代码

    // Close the write end of the poller.
    func (poller *fdPoller) wake() error {
        buf := make([]byte, 1)
        // 这里在pipe[1]写入了一个字符当做退出信号
        n, errno := unix.Write(poller.pipe[1], buf)
        if ... { ... } // 错误处理
        return nil
    

    题外话: 关于这个优雅退出的早期设计其实不是这样的, 但是思路差不多. 有兴趣可以去看看fsnotify的早期提交

    至此, 关于fsnotify对linux的实现就分析完了.

    分类:
    后端
    标签: