Skip to content

Commit

Permalink
Fix deadlock on close() and piping again, fix links, v1.0.4
Browse files Browse the repository at this point in the history
  • Loading branch information
Tremeschin committed Aug 4, 2024
1 parent 8b39729 commit ee27776
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 17 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/release.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ jobs:
runs-on: ${{matrix.os}}
strategy:
matrix:
os: [ubuntu-latest, windows-latest, macos-13, macos-14]
os: [ubuntu-latest, windows-latest, macos-14]

env:
CIBW_BUILD: cp37-* cp38-* cp39-* cp310-* cp311-* cp312-*
Expand Down
4 changes: 2 additions & 2 deletions Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ rye add turbopipe

# 🚀 Usage

See also the [**Examples**](examples) folder for more controlled usage, and [**ShaderFlow**](https://github.com/BrokenSource/ShaderFlow/blob/main/ShaderFlow/Scene.py) usage of it!
See also the [**Examples**](https://github.com/BrokenSource/TurboPipe/tree/main/examples) folder for more controlled usage, and [**ShaderFlow**](https://github.com/BrokenSource/ShaderFlow/blob/main/ShaderFlow/Scene.py) usage of it!

```python
import subprocess
Expand Down Expand Up @@ -87,7 +87,7 @@ ffmpeg.wait()
> - 🚀: Threaded `ffmpeg.stdin.write(buffer.read())` with a queue (similar to turbopipe)
> - 🌀: The magic of `turbopipe.pipe(buffer, ffmpeg.stdin.fileno())`
>
> Also see [`benchmark.py`](examples/benchmark.py) for the implementation
> Also see [`benchmark.py`](https://github.com/BrokenSource/TurboPipe/blob/main/examples/benchmark.py) for the implementation
✅ Check out benchmarks in a couple of systems below:

Expand Down
5 changes: 3 additions & 2 deletions examples/basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ def FFmpeg() -> Generator[subprocess.Popen, None, None]:

@contextlib.contextmanager
def Progress():
with tqdm.tqdm(total=TOTAL_FRAMES, unit="Frame", smoothing=0) as frame_bar, \
tqdm.tqdm(total=TOTAL_BYTES, unit="B", smoothing=0, unit_scale=True) as byte_bar:
with tqdm.tqdm(total=TOTAL_FRAMES, unit="Frame", smoothing=0, mininterval=1/30) as frame_bar, \
tqdm.tqdm(total=TOTAL_BYTES, unit="B", smoothing=0, mininterval=1/30, unit_scale=True) as byte_bar:
def next():
byte_bar.update(BYTES_PER_FRAME)
frame_bar.update(1)
Expand All @@ -80,3 +80,4 @@ def next():
turbopipe.sync()

turbopipe.close()

5 changes: 2 additions & 3 deletions examples/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ def ffmpeg(self, width: int, height: int, x264_preset: Optional[str] = None):
process = subprocess.Popen(command, stdin=subprocess.PIPE)
yield process
finally:
turbopipe.sync()
turbopipe.close()
process.stdin.close()
process.wait()

Expand Down Expand Up @@ -153,8 +153,7 @@ def run(self):

for frame in tqdm.tqdm(
desc=f"Processing (Run #{run}) ({method.value} {width}x{height})",
iterable=range(total_frames),
smoothing=0, unit=" Frame"
iterable=range(total_frames), smoothing=0, unit=" Frame", mininterval=1/30,
):
buffer = buffers[frame % nbuffer]

Expand Down
22 changes: 14 additions & 8 deletions turbopipe/_turbopipe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,22 @@
#define PY_SSIZE_T_CLEAN
#include <Python.h>

// Standard library
#include <functional>
#include <iostream>
#include <chrono>

// Threading
#include <condition_variable>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <iostream>

// Data structure
#include <unordered_set>
#include <unordered_map>
#include <deque>
#include <chrono>
#include <functional>
#include <set>

// Third party
#include "gl_methods.hpp"

#define dict std::unordered_map
Expand Down Expand Up @@ -121,8 +127,8 @@ class TurboPipe {

private:
dict<int, dict<int, condition_variable>> pending;
dict<int, unordered_set<int>> queue;
dict<int, deque<Work>> stream;
dict<int, set<int>> queue;
dict<int, thread> threads;
dict<int, mutex> mutexes;
condition_variable signal;
Expand All @@ -139,12 +145,12 @@ class TurboPipe {
pending[file][hash].wait(lock, [this, file, hash] {
return queue[file].find(hash) == queue[file].end();
});
pending[file].erase(hash);
queue[file].insert(hash);
}

// Add another job to the queue
stream[file].push_back(work);
queue[file].insert(hash);
this->running = true;
lock.unlock();

// Each file descriptor has its own thread
Expand Down
2 changes: 1 addition & 1 deletion turbopipe/version.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
#!/usr/bin/env python3
__version__ = "1.0.3"
__version__ = "1.0.4"
print(__version__)

0 comments on commit ee27776

Please sign in to comment.