Multithreading in PowerShell with Runspaces

Multithreading in PowerShell with Runspaces

PowerShell is often used to do many things, one thing at a time. One way to multitask is to use "Jobs" through commands like Start-Job for locally run jobs, or Invoke-Command for jobs either on the local machine or remote machines. In PowerShell 7 you even have the option to use Foreach-Object -Parallel { ... }!

# Available in PowerShell 5.1 and PowerShell 7+
Start-Job { 'Hi there!' } | Receive-Job -Wait -AutoRemoveJob

# Hi there!

# Available in PowerShell 7+
'Apples', 'Oranges' | Foreach-Object -Parallel { $_ }

# Result:
# Apples
# Oranges

If there are already options for running tasks in parallel in PowerShell, why would I go through the trouble of doing it any other way?

The first reason is that I am using PowerShell 5.1 for the foreseeable future as most of my time in PowerShell is spent maintaining a wrapper module for Milestone's .NET Framework-based MIP SDK. I also help internal and external customers to leverage it to automate time-consuming VMS reporting, configuration, and maintenance tasks. And as it happens, there is no parallel option with Foreach-Object in PowerShell 5.1.

Second, the built-in commands for working with "jobs" are lovely, but each job runs in a new powershell.exe process which means for my purposes, I would have to log in to the Milestone XProtect VMS using the MIP SDK within each individual job. And the objects sent to the job on creation, and the objects received from the job when completed, are serialized. So we can't easily reuse complex types passed back and forth without careful planning. There's also some extra overhead in spawning a new process compared to instantiating some new types/threads in the current process.

Here's a quick demo showing how the object types change when passing into jobs:

class Place {
    [string] $Name
}
$place = [place]@{
    Name = 'Oregon'
}
$script = {
    param([object]$place)
    $place.GetType().Name
}
$job = Start-Job $script -ArgumentList $place

$originalType = $place.GetType()
$returnType = $job | Receive-Job -Wait -AutoRemoveJob

"The original class type was '$originalType' but when we sent it into a job, it became a '$returnType'"

# Result:
# The original class type was 'Place' but when we sent it into a job, it became a 'PSObject'

And here's another example showing how jobs really run in different processes:

$jobPID = Start-Job { $PID } | Receive-Job -Wait -AutoRemoveJob
"The local PID is $PID and the job PID was $jobPID"

# Result:
# The local PID is 45900 and the job PID was 39116

To avoid the pitfalls of jobs in PowerShell 5.1 I use runspaces. And I'll be honest, it can be complicated and error-prone. There's even a ready-made module called PoshRSJob to take away most of the pain by giving you cmdlets that look and function very similar to Start-Job and Receive-Job. Instead, you can use *-RSJob with the usual Get/Wait/Receive verbs and it pretty much just works!

Here are the last two examples demonstrating there is no type serialization and the runspace jobs run under the same process ID:

class Place {
    [string] $Name
}
$place = [place]@{
    Name = 'Oregon'
}
$script = {
    param([object]$place)
    $place.GetType().Name
}
$job = Start-RSJob $script -ArgumentList $place

$originalType = $place.GetType()
$returnType = $job | Wait-RSJob | Receive-RSJob

"The original class type was '$originalType' and it was still a '$returnType' when running in the RSJob!"

$jobPID = Start-RSJob { $PID } | Wait-RSJob | Receive-RSJob
"The local PID is $PID and the job PID was $jobPID"

# Result:
# The original class type was 'Place' and it was still a 'Place' when running in the RSJob!
# The local PID is 45900 and the job PID was 45900

So that worked pretty well, and the cmdlets are really easy to use. However, I've found that at least in my use cases, my own classes below tend to out-perform the module by a wide margin. That's not to disparage the module - there's a very real possibility I'm using it wrong, and my own code is far more limited than the cmdlets provided in PoshRSJob. But the second reason I'm using my own code for managing runspaces is so that I avoid unnecessary dependencies in my modules/code. If my needs expand beyond the 80 lines of code below and I figure out how to meet or beat the performance of my implementation using PoshRSJob, I would switch in a heartbeat.

Finally, the code!

Without further ado, here's the full example which consists of two PowerShell classes, and then an example of how to use it. After the codeblock we'll break this apart and make sense of all the bits and pieces.

# Contains the output from the script passed to LocalJobRunner.AddJob, in addition to any errors thrown in the script if present.
class LocalJobResult {
    [object[]] $Output
    [System.Management.Automation.ErrorRecord[]] $Errors
}

# Contains the IAsyncResult object returned by PowerShell.BeginInvoke() as well as the PowerShell instance we need to
class LocalJob {
    [System.Management.Automation.PowerShell] $PowerShell
    [System.IAsyncResult] $Result
}

# Centralizes the complexity of running multiple commands/scripts at a time and receiving the results, including errors, when they complete.
class LocalJobRunner : IDisposable {
    hidden [System.Management.Automation.Runspaces.RunspacePool] $RunspacePool
    hidden [System.Collections.Generic.List[LocalJob]] $Jobs
    [timespan] $JobPollingInterval = (New-Timespan -Seconds 1)

    # Default constructor creates an underlying runspace pool with a max size matching the number of processors
    LocalJobRunner () {
        $this.Initialize($env:NUMBER_OF_PROCESSORS)
    }

    # Optionally you may manually specify a max size for the underlying runspace pool.
    LocalJobRunner ([int]$MaxSize) {
        $this.Initialize($MaxSize)
    }

    hidden [void] Initialize([int]$MaxSize) {
        $this.Jobs = New-Object System.Collections.Generic.List[LocalJob]
        $this.RunspacePool = [runspacefactory]::CreateRunspacePool(1, $MaxSize)
        $this.RunspacePool.Open()
    }

    # Accepts a scriptblock and a set of parameters. A new powewershell instance will be created, attached to a runspacepool, and the results can be collected later in a call to ReceiveJobs.
    [void] AddJob([scriptblock]$scriptblock, [hashtable]$parameters) {
        $parameters = if ($null -eq $parameters) { $parameters = @{} } else { $parameters }
        $shell = [powershell]::Create()
        $shell.RunspacePool = $this.RunspacePool
        $asyncResult = $shell.AddScript($scriptblock).AddParameters($parameters).BeginInvoke()
        $this.Jobs.Add([LocalJob]@{
            PowerShell = $shell
            Result = $asyncResult
        })
    }

    # Returns the output from any completed jobs in an object that also includes any errors if present.
    [LocalJobResult[]] ReceiveJobs() {
        $completedJobs = $this.Jobs | Where-Object { $_.Result.IsCompleted }
        $completedJobs | Foreach-Object { $this.Jobs.Remove($_) }
        $results = $completedJobs | Foreach-Object {
            [LocalJobResult]@{
                Output = $_.PowerShell.EndInvoke($_.Result)
                Errors = $_.PowerShell.Streams.Error
            }

            $_.PowerShell.Dispose()
        }
        return $results
    }

    # Block until all jobs have completed. The list of jobs will be polled on an interval of JobPollingInterval, which is 1 second by default.
    [void] Wait() {
        while ($this.Jobs.Result.IsCompleted -contains $false) {
            Start-Sleep -Seconds $this.JobPollingInterval.TotalSeconds
        }
    }

    # Returns $true if there are any jobs available to be received using ReceiveJobs. Use to implement your own polling strategy instead of using Wait.
    [bool] HasPendingJobs() {
        return ($this.Jobs.Count -gt 0)
    }

    # Make sure to dispose of this class so that the underlying runspace pool gets disposed.
    [void] Dispose() {
        $this.Jobs.Clear()
        $this.RunspacePool.Close()
        $this.RunspacePool.Dispose()
    }
}

Breaking down the code

Let's start by talking about the Initialize method inside the LocalJobRunner. By the way, I landed on "LocalJob" as an alternative to the "PSRemoteJob" class used by the *-Job cmdlets. I could have used "RSJob", but I felt like that was already taken by the PoshRSJob module.

Anyway, the initialize method:

    hidden [void] Initialize([int]$MaxSize) {
        $this.Jobs = New-Object System.Collections.Generic.List[LocalJob]
        $this.RunspacePool = [runspacefactory]::CreateRunspacePool(1, $MaxSize)
        $this.RunspacePool.Open()
    }

It's marked as hidden because I don't want people using the method. If this were a C# class, I would have made it "private" but that doesn't exist in PowerShell classes. The best we can do is hidden which means it's not immediately visible as a member of the type. But much like you can reveal hidden folders in Windows, you can reveal hidden members in PowerShell - just use Get-Member -Force.

It accepts a $MaxSize value which is used to define the maximum size of the [runspacepool]. And it creates a list of [LocalJob] objects which keep track of each PowerShell instance and the IAsyncResult object we need to retrieve the result of the scriptblock passed into that instance.

Here's how you can create an instance of this [LocalJobRunner] class:

# Create a default instance with the maximum size matching the number of processors by default
$runner = [LocalJobRunner]::new()

# Or create an instance with a maximum size of 4 runspaces
$runner = [LocalJobRunner]::new(4)

Next we have the AddJob($scriptblock, $parameters) method. This is how to give the [LocalJobRunner] something to do. To use it, pass in a scriptblock, and if your scriptblock takes any parameters, you can pass those in here too.

    # Accepts a scriptblock and a set of parameters. A new powewershell instance will be created, attached to a runspacepool, and the results can be collected later in a call to ReceiveJobs.
    [void] AddJob([scriptblock]$scriptblock, [hashtable]$parameters) {
        $parameters = if ($null -eq $parameters) { $parameters = @{} } else { $parameters }
        $shell = [powershell]::Create()
        $shell.RunspacePool = $this.RunspacePool
        $asyncResult = $shell.AddScript($scriptblock).AddParameters($parameters).BeginInvoke()
        $this.Jobs.Add([LocalJob]@{
            PowerShell = $shell
            Result = $asyncResult
        })
    }

The method creates a new PowerShell instance and attaches it to the RunspacePool created in the Initialize() method earlier. It then adds your scriptblock to the PowerShell instance, followed by the parameters, and then calls BeginInvoke(). This method tells the PowerShell instance to start chewing on the scriptblock we provided, but it doesn't "block" until the task is completed. Instead, it returns an IAsyncResult immediately, and we pair up the PowerShell instance with the IAsyncResult and add it to a list so we can check in on it later.

To receive the results of our "jobs", we can call ReceiveJobs() which will look for any completed jobs by checking the IsCompleted property of all the IAsyncResult objects in the Jobs list. Any result where IsCompleted is $true signifies that the scriptblock has finished and we either have some output waiting for us, or errors, or both.

    # Returns the output from any completed jobs in an object that also includes any errors if present.
    [LocalJobResult[]] ReceiveJobs() {
        $completedJobs = $this.Jobs | Where-Object { $_.Result.IsCompleted }
        $completedJobs | Foreach-Object { $this.Jobs.Remove($_) }
        $results = $completedJobs | Foreach-Object {
            [LocalJobResult]@{
                Output = $_.PowerShell.EndInvoke($_.Result)
                Errors = $_.PowerShell.Streams.Error
            }

            $_.PowerShell.Dispose()
        }
        return $results
    }

The method creates a new temporary list of just the completed jobs, and enumerates through each of them in a Foreach-Object where we retrieve the results by calling EndInvoke($_.Result). By passing the IAsyncResult object into EndInvoke, we get all of the output produced by the scriptblock we passed in earlier. That can include one or more objects, and technically they can be objects of different types. So we stuff all the results into a generic array of [object]'s. We also collect any errors thrown by the scriptblock before disposing of the PowerShell instance and returning the [LocalJobResult] with our results/errors.

The Wait() method gives us an easy way to block execution until all of the jobs have completed, after which we can retrieve the results using the ReceiveJobs() method.

The HasPendingJobs() method lets us check to see if there are any completed jobs available for us to pick up. In the example below, you'll see how I use that in a polling loop to pickup results as they become available.

When we're done with this [LocalJobRunner] instance, it's a good idea to dispose of it. In the example below, I'm doing this in the finally block to make sure that it gets disposed, no matter what kind of errors might happen in our try block.

Finally, on to the contrived example! In the script below, we create a simple scriptblock which sleeps for a random 1-2 seconds, then outputs the value of $Name which we expect to be passed in by the caller of the scriptblock. We also throw an error ~20% of the time to show how errors are attached to the results.

In the try block, we continually call HasPendingJobs() to see if there are any jobs left for us to pick up. If so, we use ReceiveJobs() to pick up any completed jobs and write them out to the terminal. Then we sleep a little bit to avoid high CPU usage before we check to see if there are any more jobs to pick up.

Finally, the line in our finally block is executed no matter what happens up above, so the [LocalJobRunner] instance gets cleaned up and we don't have any runspace instances sitting around in the background wasting resources.

$script = {
    param([string]$Name)
    Start-Sleep -Seconds (Get-Random -Minimum 1 -Maximum 3)
    Write-Output $Name
    if ((Get-Random -Minimum 1 -Maximum 5) -eq 1) {
        Write-Error "Random error!"
    }
}
$runner = [LocalJobRunner]::new()
try {
    1..24 | ForEach-Object {
        $jobParams = @{ Name = "Job Number $_" }
        $runner.AddJob($script, $jobParams)
    }
    while ($runner.HasPendingJobs()) {
        foreach ($result in $runner.ReceiveJobs()) {
            Write-Output $result.Output
            $result.Errors | Foreach-Object { Write-Error $_ }
        }
        Start-Sleep -Milliseconds 100
    }
}
finally {
    $runner.Dispose()
}