Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/Distributed.jl
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ export
worker_id_from_socket,
cluster_cookie,
start_worker,
worker_output_hook,

# Used only by shared arrays.
check_same_host
Expand Down
42 changes: 40 additions & 2 deletions src/cluster.jl
Original file line number Diff line number Diff line change
Expand Up @@ -283,15 +283,53 @@ function start_worker(out::IO, cookie::AbstractString=readline(stdin); close_std
end


"""
worker_output_hook

A `Ref` holding an optional callback function for customizing worker output display.
When set to a function `f(ident::AbstractString, line::AbstractString) -> Bool`,
it will be called for each line of worker output. If the function returns `true`,
the line is considered handled and will not be printed with the default prefix.
If the function returns `false` or if `worker_output_hook[]` is `nothing`,
the default `"From worker \$ident: "` prefix is used.

This allows test harnesses or other callers to provide richer context about
what each worker is doing (e.g., which test is running) instead of just the worker id.

# Example
```julia
Distributed.worker_output_hook[] = (ident, line) -> begin
test_name = get_current_test_for_worker(parse(Int, ident))
if test_name !== nothing
println("[", test_name, "] (worker ", ident, "): ", line)
return true
end
return false
end
```
"""
const worker_output_hook = Ref{Union{Nothing, Function}}(nothing)

function redirect_worker_output(ident, stream)
t = @async while !eof(stream)
line = readline(stream)
if startswith(line, " From worker ")
if startswith(line, "From worker ")
# stdout's of "additional" workers started from an initial worker on a host are not available
# on the master directly - they are routed via the initial worker's stdout.
println(line)
else
println(" From worker $(ident):\t$line")
handled = false
hook = worker_output_hook[]
if hook !== nothing
try
handled = hook(ident, line)::Bool
catch
handled = false
end
end
if !handled
println("From worker $(ident): $line")
end
end
end
errormonitor(t)
Expand Down
2 changes: 1 addition & 1 deletion src/managers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -631,7 +631,7 @@ function connect(manager::ClusterManager, pid::Int, config::WorkerConfig)

if config.io !== nothing
let pid = pid
redirect_worker_output(pid, notnothing(config.io))
redirect_worker_output("$pid", notnothing(config.io))
end
end

Expand Down
Loading