-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathstream.go
139 lines (120 loc) · 4.7 KB
/
stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
// Package stream provides a way to read and write to a synchronous buffered pipe, with multiple reader support.
package stream
import (
"errors"
"sync"
)
// ErrUnsupported is returned when an operation is not supported.
var ErrUnsupported = errors.New("unsupported")
// Stream is used to concurrently Write and Read from a File.
type Stream struct {
mu sync.Mutex
b *broadcaster
file File
fs FileSystem
seekEnd sizeOnce
closeOnce onceWithErr
}
// New creates a new Stream from the StdFileSystem with Name "name".
func New(name string) (*Stream, error) {
return NewStream(name, StdFileSystem)
}
// NewStream creates a new Stream with Name "name" in FileSystem fs.
func NewStream(name string, fs FileSystem) (*Stream, error) {
f, err := fs.Create(name)
return newStream(f, fs), err
}
// NewMemStream creates an in-memory stream with no name, and no underlying fs.
// This should replace uses of NewStream("name", NewMemFs()).
// Remove() is unsupported as there is no fs to remove it from.
func NewMemStream() *Stream {
f := newMemFile("")
return newStream(f, singletonFs{f})
}
func newStream(file File, fs FileSystem) *Stream {
return &Stream{
file: file,
fs: fs,
b: newBroadcaster(),
}
}
type singletonFs struct {
file *memFile
}
func (fs singletonFs) Create(key string) (File, error) { return nil, ErrUnsupported }
func (fs singletonFs) Open(key string) (File, error) { return &memReader{memFile: fs.file}, nil }
func (fs singletonFs) Remove(key string) error { return ErrUnsupported }
// Name returns the name of the underlying File in the FileSystem.
func (s *Stream) Name() string { return s.file.Name() }
// Write writes p to the Stream. It's concurrent safe to be called with Stream's other methods.
func (s *Stream) Write(p []byte) (int, error) {
s.mu.Lock()
defer s.mu.Unlock()
n, err := s.file.Write(p)
s.b.Wrote(n)
return n, err
}
// Close will close the active stream. This will cause Readers to return EOF once they have
// read the entire stream.
func (s *Stream) Close() error {
s.mu.Lock()
defer s.mu.Unlock()
return s.closeOnce.Do(func() (err error) {
err = s.file.Close()
s.b.Close()
return err
})
}
// SetSeekEnd is required in order to support Range Requests. Range Requests
// require the length of a Stream to be returned by using Reader.Seek with io.SeekEnd.
// You must set this value to the expected final length of the Stream in order for
// Range Requests / SeekEnd to work correctly.
//
// This method must be called before any such Seek in order to work, and will error
// if called after a Seek with io.SeekEnd. SeekEnd will still work correctly even
// if this is not called, but it will block until the entire stream is written in
// order to actually seek to the true end position.
//
// This value can only be set once, subsequent sets will not update the SeekEnd position
// and will return an error.
//
// This method currently has no other affects on the Stream and in particular
// does not prevent Writing a different amount of bytes and Closing the stream, though this
// is undefined behavior, and this library reserves the right to define that behavior in the future.
func (s *Stream) SetSeekEnd(size int64) error {
return s.seekEnd.set(size)
}
// Remove will block until the Stream and all its Readers have been Closed,
// at which point it will delete the underlying file. NextReader() will return
// ErrRemoving if called after Remove.
func (s *Stream) Remove() error {
s.ShutdownWithErr(ErrRemoving)
return s.fs.Remove(s.file.Name())
}
// ShutdownWithErr causes NextReader to stop creating new Readers and instead return err, this
// method also blocks until all Readers and the Writer have closed.
func (s *Stream) ShutdownWithErr(err error) {
if err == nil {
return
}
s.b.PreventNewHandles(err) // no new readers can be created, but existing ones can finish, same with the writer
s.b.WaitForZeroHandles() // wait for exiting handles to finish up
}
// Cancel signals that this Stream is forcibly ending, NextReader() will fail, existing readers will fail Reads, all Readers & Writer are Closed.
// This call is non-blocking, and Remove() after this call is non-blocking.
func (s *Stream) Cancel() error {
s.b.Cancel() // all existing reads are canceled, no new reads will occur, all readers closed
return s.Close() // all writes are stopped
}
// NextReader will return a concurrent-safe Reader for this stream. Each Reader will
// see a complete and independent view of the stream, and can Read while the stream
// is written to.
func (s *Stream) NextReader() (*Reader, error) {
return s.b.NewReader(func() (*Reader, error) {
file, err := s.fs.Open(s.file.Name())
if err != nil {
return nil, err
}
return &Reader{file: file, s: s}, nil
})
}