Building the Death Star with ZIO Stream

Alright team, the current government - you know, the Galatic Empire - has made us responsible for their new project, a space station called “Death Star” (what a tacky name). Basically, they want a station literally the size of a moon. If you ask me, Governor Tarkin is trying to show some service.
Anyways. They know we are the best in the galaxy when it comes to organize working streams through our futuristic ZIO Stream technology
- and they are right. Let’s have a look at the requirements.
Unfinished Space Station
Have I mentioned that, besides been the size of a moon, it has to be shaped like one? That’s the first requirement. Other than that, everything else is pretty much standard, with an exception. The space station requires:
- a Moon Shaped body
- a Propulsion Engine
- Force Shields
- fleet of Tie Fighters
- a Super Laser (?!?!?!?)
At the very end, the Empire will have someone here to inspect and approve the project. Here is our end goal:
case class DeathStar(engine: PropulsionEngine,
shield: ForceShield,
fleet: List[TieFighter],
laser: SuperLaser)
Tarkin wants to keep the project in secret (sure - imagine what the taxpayer would say if they knew how their taxes are being used), so we should use a diferent name while it’s being built. Let’s call it UnfinishedSpaceStation
case class UnfinishedSpaceStation(
engine: Option[PropulsionEngine] = None,
shield: Option[ForceShield] = None,
fleet: List[TieFighter] = List.empty,
laser: Option[SuperLaser] = None) {
def installEngine(engine: PropulsionEngine): UnfinishedSpaceStation =
copy(engine = Some(engine))
def installShield(shield: ForceShield): UnfinishedSpaceStation =
copy(shield = Some(shield))
def deployFleet(fleet: List[TieFighter]): UnfinishedSpaceStation =
copy(fleet = fleet)
def installLaser(superLaser: SuperLaser): UnfinishedSpaceStation =
copy(laser = Some(superLaser))
Seems appropriate. Defining how to install/deploy every component is the easy part, the challenge will be to managed and align all the providers of every component.
The Empire is aware of the unknows, expecting it can take some time and possibly, unexpected random problems. So we should make it part of our contract:
type DStream = ZStream[Clock, Throwable, UnfinishedSpaceStation]
Every one of our streams will be a ZStream
that depends on time via Clock
, can fail, represented by Throwable
, and produce an UnfinishedSpaceStation
. DStream
is just a shortcut, which makes it easier to comunicate our needs among our providers, so we will use it initially.
Time to turn on the holoprojector and start some conversations. First things first, we don’t have an unfinished station without a body.
The initial source - SpaceStationBodyShop
Am I the only one surprised that they already had a production line of “moon shaped stations”? Maybe these days they are more popular than I imagined. Obviously it can take some time to build such a big structure, but at least we don’t need any customization from this shop.
class SpaceStationBodyShop(buildTime: Duration) {
val moonShapedSpaceStations: DStream =
Towing a moon around is not the easiest task, so adding the PropulsionEngine
is the next logical step.
Aligning streams - PropulsionEngineShop
Looking at the engines, the options are:
sealed trait PropulsionEngine
case object HyperDriveEngine extends PropulsionEngine
case object InfiniteImprobabilityDrive extends PropulsionEngine
We will order a HyperDriveEngine
. Never heard of this InfiniteImprobabilityDrive
, it seems like something from another universe. This shop is very efficient, we can order a PropulsionEngine
and they start producing it:
val engines: Stream[Nothing, PropulsionEngine] = Stream(engine).forever
This time we need to define a workflow. PropulsionEngineShop
will receive a DStream
, so they need to align their stream of engines with our stream of unfinished stations, install the engine, and return another DStream
. It can be done using zip
and map
- Zips this stream together with the specified stream
- Returns a stream made of the elements of this stream transformed withf0
(a given function)
class PropulsionEngineShop(engine: PropulsionEngine) {
val engines: Stream[Nothing, PropulsionEngine] = Stream(engine).forever
def installEngine(stations: DStream): DStream =
.map { case (station, engine) => station.installEngine(engine) }
Such a big station is an easy target, so let’s add the primary defense of any space station: a force shield.
Flattening - ForceShieldShop
According to the shop, force shields are delivered in shipments:
case class ShieldShipment(shields: List[ForceShield])
val shipments: Stream[Nothing, ShieldShipment] = Stream.repeatEffect(
They asked for help to define a work flow where they can transform their stream of shipments
into a stream of shields
, so they could install them to us. mapConcat
is the solution here:
- Maps each element to an iterable, and flattens the iterables into the output of this stream
val shields: Stream[Nothing, ForceShield] = shipments.mapConcat(_.shields)
Now, installing shields is trivial:
def installShield(stations: DStream): DStream =
.map { case (station, shield) => station.installShield(shield) }
A space station is not a respectful space station if it doesn’t have its own fleet.
Partitioning - TieFighterShop
The requirement comes from our side this time: we need a large amount of tie fighters deployed at once. Let’s tell the shop to group our fleet with grouped
before deploying it:
- Partitions the stream with specified chunkSize
Their answer was: “Like shooting rancors in a cage!”
class TieFighterShop(groupSize: Int) {
val tieFighters: Stream[Nothing, TieFighter] = Stream(TieFighter()).forever
def deployFleet(stations: DStream): DStream =
.map { case (station, fleet) => station.deployFleet(fleet) }
Now we have to deal with the project’s proper unknown.
Changing the environment: SuperLaserShop
We probably share the same opinion about the person who came up with this requirement:
Besides, this shop seems very suspicious. Apparently, SuperLaser
s are highly regulated, so the shop has “a guy” who can take anything between 5 to 15 seconds to “make it happen”.
The uncertainty means we can’t use DStream
from this point on, we should make the Random
factor explicit:
class SuperLaserShop {
val theGuySchedule = Schedule.randomDelay(5.seconds, 15.seconds)
val lasers: ZStream[Random with Clock, Nothing, SuperLaser] =
def installTheLaser(stations: DStream):
ZStream[Random with Clock, Throwable, UnfinishedSpaceStation] =
.map { case (station, laser) => station.installLaser(laser) }
I have a bad feeling about this… but sometimes we must let go of our pride and do what is requested of us.
It’s not up to us to say the project is complete, so there’s one last step in the workflow.
Filtering - EmpireAuditor
Every station has to be inspected by the auditor. When it’s approved, a new DeathStar
can be created, otherwise one can receive a StationRejected
. Essentially, he will filter and transform the stream.
- Performs a filter and map in a single step
object EmpireAuditor {
case class StationRejected(station: UnfinishedSpaceStation)
extends IllegalStateException(s"$station does not meet the necessary criteria")
class EmpireAuditor {
def inspect(stations: ZStream[Random with Clock, Throwable, UnfinishedSpaceStation]
): ZStream[Random with Clock, Throwable, DeathStar] =
.collect {
case UnfinishedSpaceStation(Some(engine), Some(shield), fleet, Some(laser)) if fleet.nonEmpty =>
DeathStar(engine, shield, fleet, laser)
case unfinished =>
throw StationRejected(unfinished)
All plans we need are in place, time to do it. As someone once said: “Do. Or do not. There is no try”.
The main workflow
So far, we have described workflows. Now we will build the Death Star. With all the descriptions in place, we just need to thread them, take the amount of DeathStar
s ordered and materialize it. ZStream gives us everything we need to achieve that:
- Threads the stream through the transformation functionf
- Takes the specified number of elements from this stream
- Runs the stream and collects all of its elements in a list
class StarshipFactory(
stationBodyShop: SpaceStationBodyShop,
propulsionEngineShop: PropulsionEngineShop,
forceShieldShop: ForceShieldShop,
tieFighterShop: TieFighterShop,
superLaserShop: SuperLaserShop,
auditor: EmpireAuditor) {
def orderDeathStar(quantity: Int):
ZIO[Random with Clock, Throwable, List[DeathStar]] =
That’s no moon. It’s a space station.
Building space stations has never been easier. Using a few ZStream
operations - zip
, map
, mapConcat
, group
, collect
, via
, take
and runCollect
- we managed to build a space station the size of a moon, combining different sources, dealing delays and unexpected random events, easily making them work together.
Have a look at the repository with the full example.
To learn more about ZIO Streams, I recommend this video, recorded a long time ago, in a galaxy far far away:
Functional Scala - Modern Data Driven Applications with ZIO Streams by Itamar Ravid
Post-credits scene
The chief engineer responsible for the project, a human called Galen Erso, installed a thermal exhaust port, in order to dissipate the excess heat produced by the Hyper Drive Engine. Despite the fact it is not necessary, it seems like no one in the Empire noticed there’s now a hole in the space station, that leads directly to the reactor system.
Not that it is an actual problem, I mean, who would think about attacking a giant space station equipped with a super laser?