Revert "parallelRun, parallelMap: init"
This reverts commit 4d12b83ae0437ad26bd0158178eadfac1c608c2d. The stdin read lock mechnism can cause the whole process pool to lock up indefinitely
This commit is contained in:
parent
c419691ccb
commit
6b8e23b9f2
@ -1,89 +0,0 @@
|
|||||||
# Parallel execution utilities
|
|
||||||
# These functions provide a framework for parallel processing of jobs from stdin
|
|
||||||
|
|
||||||
# parallelRun - Execute a command in parallel across multiple cores
|
|
||||||
#
|
|
||||||
# Reads null-delimited jobs from stdin and distributes them across NIX_BUILD_CORES
|
|
||||||
# worker processes. Each worker executes the provided command, receiving jobs
|
|
||||||
# via stdin in null-delimited format.
|
|
||||||
#
|
|
||||||
# Usage: some_producer | parallelRun command [args...]
|
|
||||||
#
|
|
||||||
# The command receives jobs one at a time via stdin (null-delimited).
|
|
||||||
#
|
|
||||||
# Example:
|
|
||||||
# find . -name '*.log' -print0 | parallelRun sh -c '
|
|
||||||
# while read -r -d "" file; do gzip "$file"; done
|
|
||||||
# '
|
|
||||||
parallelRun() {
|
|
||||||
local pids
|
|
||||||
local lock
|
|
||||||
pids=()
|
|
||||||
lock=$(mktemp -u)
|
|
||||||
mkfifo "$lock"
|
|
||||||
for ((i=0; i<NIX_BUILD_CORES; i++)); do
|
|
||||||
{
|
|
||||||
exec 3<"$lock" # fd-3 = read side of lock
|
|
||||||
exec 4>"$lock" # fd-4 = write side of lock (push token back)
|
|
||||||
local job
|
|
||||||
|
|
||||||
while :; do
|
|
||||||
# Acquire the lock: blocks until a token can be read
|
|
||||||
read -r -n1 >/dev/null <&3
|
|
||||||
|
|
||||||
# read one job from stdin
|
|
||||||
# This is guarded by the lock above in order to prevent
|
|
||||||
# multiple workers from reading from stdin simultaneously.
|
|
||||||
if ! IFS= read -r -d '' job; then
|
|
||||||
# If stdin is closed, release lock and exit
|
|
||||||
printf 'x' >&4
|
|
||||||
break
|
|
||||||
fi
|
|
||||||
|
|
||||||
# Release the lock: write a token back to the lock FIFO
|
|
||||||
printf 'y' >&4
|
|
||||||
|
|
||||||
# Forward job to the worker process' stdin
|
|
||||||
printf '%s\0' "$job"
|
|
||||||
|
|
||||||
done \
|
|
||||||
| "$@" # launch the worker process
|
|
||||||
} &
|
|
||||||
pids[$i]=$!
|
|
||||||
done
|
|
||||||
# launch the workers by writing a token to the lock FIFO
|
|
||||||
printf 'a' >"$lock" &
|
|
||||||
# Wait for all workers to finish
|
|
||||||
for pid in "${pids[@]}"; do
|
|
||||||
if ! wait "$pid"; then
|
|
||||||
echo "A parallel job failed with exit code $? (check for errors above)" >&2
|
|
||||||
echo -e "Failing Command:\n $@" >&2
|
|
||||||
exit 1
|
|
||||||
fi
|
|
||||||
done
|
|
||||||
rm "$lock"
|
|
||||||
}
|
|
||||||
|
|
||||||
# parallelMap - Apply a shell function to each job in parallel
|
|
||||||
#
|
|
||||||
# A higher-level wrapper around parallelRun that applies a shell function to each
|
|
||||||
# null-delimited job from stdin. The shell function receives each job as its first
|
|
||||||
# argument.
|
|
||||||
#
|
|
||||||
# Usage: some_producer | parallelMap shell_function [additional_args...]
|
|
||||||
#
|
|
||||||
# The shell function is called as: shell_function job [additional_args...]
|
|
||||||
# for each job read from stdin.
|
|
||||||
#
|
|
||||||
# Example:
|
|
||||||
# compress() { gzip "$1" }
|
|
||||||
# find . -name '*.log' -print0 | parallelMap compress
|
|
||||||
parallelMap() {
|
|
||||||
_wrapper() {
|
|
||||||
while IFS= read -r -d '' job; do
|
|
||||||
"$@" "$job"
|
|
||||||
done
|
|
||||||
}
|
|
||||||
parallelRun _wrapper "$@"
|
|
||||||
unset -f _wrapper
|
|
||||||
}
|
|
||||||
@ -1,20 +0,0 @@
|
|||||||
{
|
|
||||||
stdenv,
|
|
||||||
}:
|
|
||||||
{
|
|
||||||
# test based on bootstrap tools to prevent rebuilding stdenv on each change
|
|
||||||
parallel = derivation {
|
|
||||||
name = "test-parallel-hook";
|
|
||||||
system = stdenv.system;
|
|
||||||
builder = "${stdenv.bootstrapTools}/bin/bash";
|
|
||||||
PATH = "${stdenv.bootstrapTools}/bin";
|
|
||||||
args = [
|
|
||||||
"-c"
|
|
||||||
''
|
|
||||||
. ${../parallel.sh}
|
|
||||||
. ${./test-parallel.sh}
|
|
||||||
''
|
|
||||||
];
|
|
||||||
meta = { };
|
|
||||||
};
|
|
||||||
}
|
|
||||||
@ -1,146 +0,0 @@
|
|||||||
export NIX_BUILD_CORES=4
|
|
||||||
|
|
||||||
echo "Testing worker distribution..."
|
|
||||||
|
|
||||||
# Generate 100 jobs to ensure all workers get some
|
|
||||||
for i in {1..100}; do
|
|
||||||
printf "job%d\0" $i
|
|
||||||
done | parallelRun sh -c '
|
|
||||||
while IFS= read -r -d "" job; do
|
|
||||||
sleep 0.05 # Simulate some work
|
|
||||||
echo "Worker $$ processed $job" >> /tmp/worker-output
|
|
||||||
done
|
|
||||||
'
|
|
||||||
|
|
||||||
# Check that all 4 workers were actually utilized
|
|
||||||
worker_count=$(sort /tmp/worker-output | cut -d" " -f2 | sort -u | wc -l)
|
|
||||||
if [ "$worker_count" -ne 4 ]; then
|
|
||||||
echo "ERROR: Expected exactly 4 workers, got $worker_count"
|
|
||||||
cat /tmp/worker-output
|
|
||||||
exit 1
|
|
||||||
fi
|
|
||||||
echo "SUCCESS: All 4 workers participated"
|
|
||||||
rm -f /tmp/worker-output
|
|
||||||
|
|
||||||
echo "Testing error propagation..."
|
|
||||||
|
|
||||||
# Test that errors from workers are propagated
|
|
||||||
if printf "job1\0job2\0job3\0" | parallelRun sh -c '
|
|
||||||
while IFS= read -r -d "" job; do
|
|
||||||
if [ "$job" = "job2" ]; then
|
|
||||||
echo "Worker failing on $job" >&2
|
|
||||||
exit 1
|
|
||||||
fi
|
|
||||||
echo "Worker processed $job"
|
|
||||||
done
|
|
||||||
' 2>/dev/null; then
|
|
||||||
echo "ERROR: Expected command to fail but it succeeded"
|
|
||||||
exit 1
|
|
||||||
else
|
|
||||||
echo "SUCCESS: Error was properly propagated"
|
|
||||||
fi
|
|
||||||
|
|
||||||
echo "Testing error message..."
|
|
||||||
|
|
||||||
error_output=$(printf "job1\0job2\0job3\0" | parallelRun sh -c '
|
|
||||||
while IFS= read -r -d "" job; do
|
|
||||||
if [ "$job" = "job2" ]; then
|
|
||||||
echo "Worker failing on $job" >&2
|
|
||||||
exit 1
|
|
||||||
fi
|
|
||||||
echo "Worker processed $job"
|
|
||||||
done
|
|
||||||
' 2>&1 || true)
|
|
||||||
|
|
||||||
if [[ "$error_output" != *"job failed"* ]]; then
|
|
||||||
echo "ERROR: Expected 'job failed' in error message, got: $error_output"
|
|
||||||
exit 1
|
|
||||||
fi
|
|
||||||
echo "SUCCESS: Error message was displayed"
|
|
||||||
|
|
||||||
echo "Testing Verify all jobs are processed when no errors occur..."
|
|
||||||
|
|
||||||
# Generate jobs and count processed ones
|
|
||||||
for i in {1..10}; do
|
|
||||||
printf "job%d\0" $i
|
|
||||||
done | parallelRun sh -c '
|
|
||||||
while IFS= read -r -d "" job; do
|
|
||||||
echo "$job" >> /tmp/processed-jobs
|
|
||||||
done
|
|
||||||
'
|
|
||||||
|
|
||||||
processed_count=$(wc -l < /tmp/processed-jobs)
|
|
||||||
if [ "$processed_count" -ne 10 ]; then
|
|
||||||
echo "ERROR: Expected 10 jobs processed, got $processed_count"
|
|
||||||
exit 1
|
|
||||||
fi
|
|
||||||
echo "SUCCESS: All 10 jobs were processed"
|
|
||||||
rm -f /tmp/processed-jobs
|
|
||||||
|
|
||||||
echo "All parallelRun tests passed!"
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------
|
|
||||||
|
|
||||||
echo "Testing parallelMap basic functionality..."
|
|
||||||
|
|
||||||
# Define a test function
|
|
||||||
testFunc() {
|
|
||||||
echo "Processing: $1" >> /tmp/map-output
|
|
||||||
}
|
|
||||||
|
|
||||||
# Test that parallelMap calls the function with each job
|
|
||||||
for i in {1..5}; do
|
|
||||||
printf "item%d\0" $i
|
|
||||||
done | parallelMap testFunc
|
|
||||||
|
|
||||||
# Check all jobs were processed
|
|
||||||
processed_map_count=$(wc -l < /tmp/map-output)
|
|
||||||
if [ "$processed_map_count" -ne 5 ]; then
|
|
||||||
echo "ERROR: Expected 5 items processed by parallelMap, got $processed_map_count"
|
|
||||||
exit 1
|
|
||||||
fi
|
|
||||||
echo "SUCCESS: parallelMap processed all 5 items"
|
|
||||||
rm -f /tmp/map-output
|
|
||||||
|
|
||||||
echo "Testing parallelMap error propagation..."
|
|
||||||
|
|
||||||
# Define a function that fails on specific input
|
|
||||||
failFunc() {
|
|
||||||
if [ "$1" = "item2" ]; then
|
|
||||||
echo "Function failing on $1" >&2
|
|
||||||
exit 1
|
|
||||||
fi
|
|
||||||
echo "Function processed $1"
|
|
||||||
}
|
|
||||||
|
|
||||||
# Test that errors are propagated
|
|
||||||
if printf "item1\0item2\0item3\0" | parallelMap failFunc 2>/dev/null; then
|
|
||||||
echo "ERROR: Expected parallelMap to fail but it succeeded"
|
|
||||||
exit 1
|
|
||||||
else
|
|
||||||
echo "SUCCESS: parallelMap error was properly propagated"
|
|
||||||
fi
|
|
||||||
|
|
||||||
echo "Testing parallelMap with additional arguments..."
|
|
||||||
|
|
||||||
# Define a function that uses additional arguments
|
|
||||||
argFunc() {
|
|
||||||
echo "$1: $2" >> /tmp/map-args-output
|
|
||||||
}
|
|
||||||
|
|
||||||
# Test with additional arguments
|
|
||||||
for i in {1..3}; do
|
|
||||||
printf "value%d\0" $i
|
|
||||||
done | parallelMap argFunc "PREFIX"
|
|
||||||
|
|
||||||
# Check output contains the prefix
|
|
||||||
if ! grep -q "PREFIX: value1" /tmp/map-args-output; then
|
|
||||||
echo "ERROR: parallelMap did not pass additional arguments correctly"
|
|
||||||
cat /tmp/map-args-output
|
|
||||||
exit 1
|
|
||||||
fi
|
|
||||||
echo "SUCCESS: parallelMap passed additional arguments correctly"
|
|
||||||
rm -f /tmp/map-args-output
|
|
||||||
|
|
||||||
echo "All parallelRun and parallelMap tests passed!"
|
|
||||||
touch $out
|
|
||||||
@ -78,7 +78,6 @@ let
|
|||||||
../../build-support/setup-hooks/move-sbin.sh
|
../../build-support/setup-hooks/move-sbin.sh
|
||||||
../../build-support/setup-hooks/move-systemd-user-units.sh
|
../../build-support/setup-hooks/move-systemd-user-units.sh
|
||||||
../../build-support/setup-hooks/multiple-outputs.sh
|
../../build-support/setup-hooks/multiple-outputs.sh
|
||||||
../../build-support/setup-hooks/parallel.sh
|
|
||||||
../../build-support/setup-hooks/patch-shebangs.sh
|
../../build-support/setup-hooks/patch-shebangs.sh
|
||||||
../../build-support/setup-hooks/prune-libtool-files.sh
|
../../build-support/setup-hooks/prune-libtool-files.sh
|
||||||
../../build-support/setup-hooks/reproducible-builds.sh
|
../../build-support/setup-hooks/reproducible-builds.sh
|
||||||
|
|||||||
@ -212,6 +212,4 @@ with pkgs;
|
|||||||
build-environment-info = callPackage ./build-environment-info { };
|
build-environment-info = callPackage ./build-environment-info { };
|
||||||
|
|
||||||
rust-hooks = recurseIntoAttrs (callPackages ../build-support/rust/hooks/test { });
|
rust-hooks = recurseIntoAttrs (callPackages ../build-support/rust/hooks/test { });
|
||||||
|
|
||||||
setup-hooks = recurseIntoAttrs (callPackages ../build-support/setup-hooks/tests { });
|
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user