Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
8c9b6f88b594256ad7be3a0daab383ba
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
779196c954c1103faf1447eca617ce6e9f6b138d65c8998be1dbec16e18e4459db0dcdbe56c06517521c3821bbaa5b2bfe80c7576b3979f7bff7c816c790e546
4 changes: 2 additions & 2 deletions stdlib/Distributed.version
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
DISTRIBUTED_BRANCH = master
DISTRIBUTED_SHA1 = cd9219573d736b036077dff3cadddf369516d495
DISTRIBUTED_BRANCH = ib/worker_output_customization
DISTRIBUTED_SHA1 = d45eda66cfc632a6bbd4c82e708e6a42b127f217
DISTRIBUTED_GIT_URL := https://github.com/JuliaLang/Distributed.jl
DISTRIBUTED_TAR_URL = https://api.github.com/repos/JuliaLang/Distributed.jl/tarball/$1
60 changes: 58 additions & 2 deletions test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,39 @@ include("buildkitetestjson.jl")
const longrunning_delay = parse(Int, get(ENV, "JULIA_TEST_LONGRUNNING_DELAY", "45")) * 60 # minutes
const longrunning_interval = parse(Int, get(ENV, "JULIA_TEST_LONGRUNNING_INTERVAL", "15")) * 60 # minutes

# Helper to run code with prefixed output (uses Pipe + background reader)
function with_output_prefix(f, prefix::String, io::IO, lock::ReentrantLock)
pipe = Pipe()
Base.link_pipe!(pipe; reader_supports_async=true, writer_supports_async=true)

reader_task = @async begin
try
while isopen(pipe) || bytesavailable(pipe) > 0
line = readline(pipe; keep=true)
isempty(line) && break
@lock lock begin
printstyled(io, " ", prefix, ": ", color=:light_black)
print(io, line)
endswith(line, '\n') || println(io)
end
end
catch e
e isa EOFError || rethrow()
end
end

try
redirect_stdout(pipe) do
redirect_stderr(pipe) do
f()
end
end
finally
close(pipe.in)
wait(reader_task)
end
end

(; tests, net_on, exit_on_error, use_revise, buildroot, seed) = choosetests(ARGS)
tests = unique(tests)

Expand Down Expand Up @@ -123,6 +156,9 @@ cd(@__DIR__) do
end
skipped = 0

# Track which test is running on each worker (worker_id => test_name)
worker_current_test = Dict{Int, String}()

@everywhere include("testdefs.jl")

if use_revise
Expand Down Expand Up @@ -160,6 +196,21 @@ cd(@__DIR__) do
stderr.lock = print_lock
end

# Set up hook to display test name with worker output
Distributed.worker_output_hook[] = (ident, line) -> begin
wrkr_id = tryparse(Int, ident)
test_name = wrkr_id === nothing ? nothing : get(worker_current_test, wrkr_id, nothing)
@lock print_lock begin
if test_name !== nothing
printstyled(" ", test_name, " (", ident, "): ", color=:light_black)
else
printstyled(" From worker ", ident, ": ", color=:light_black)
end
println(line)
end
return true
end

function print_testworker_stats(test, wrkr, resp)
@nospecialize resp
lock(print_lock)
Expand Down Expand Up @@ -268,6 +319,7 @@ cd(@__DIR__) do
test = popfirst!(tests)
running_tests[test] = now()
wrkr = p
worker_current_test[wrkr] = test

# Create a timer for this test to report long-running status
test_timers[test] = Timer(longrunning_delay, interval=longrunning_interval) do timer
Expand Down Expand Up @@ -304,6 +356,7 @@ cd(@__DIR__) do
Any[CapturedException(e, catch_backtrace())], time() - before
end
delete!(running_tests, test)
delete!(worker_current_test, wrkr)
if haskey(test_timers, test)
close(test_timers[test])
delete!(test_timers, test)
Expand Down Expand Up @@ -361,8 +414,10 @@ cd(@__DIR__) do
t == "SharedArrays" && (isolate = false)
before = time()
resp, duration = try
r = @invokelatest runtests(t, test_path(t), isolate, seed=seed) # runtests is defined by the include above
r, time() - before
with_output_prefix("$t (1)", stdout, print_lock) do
r = @invokelatest runtests(t, test_path(t), isolate, seed=seed) # runtests is defined by the include above
r, time() - before
end
catch e
isa(e, InterruptException) && rethrow()
Any[CapturedException(e, catch_backtrace())], time() - before
Expand Down Expand Up @@ -395,6 +450,7 @@ cd(@__DIR__) do
if @isdefined test_timers
foreach(close, values(test_timers))
end
Distributed.worker_output_hook[] = nothing
end

#=
Expand Down