Scio 0.7: a deep dive

May 30, 2019 Published by Claire McGinty

Introduction

Large-scale data processing is a critical component of Spotify’s business model. It drives music recommendations, artist payouts based on stream counts, and insights about how users interact with Spotify. Every day we capture hundreds of terabytes of event data, in addition to database snapshots and derived datasets. It’s imperative that engineers who want to work with this data can quickly write and execute application-level code without worrying about the low-level semantics of Map/Reduce frameworks, provisioning the right amount of compute power, or writing extensive boilerplate code for every job. To that end, Spotify engineers developed Scio, a Scala API for Apache Beam and Google Cloud Dataflow similar to frameworks like Spark or Scalding.

Scio has been in development for almost four years, and we’re happy to announce the release of Scio 0.7.0! With thousands of production workflows running Scio, tens of thousands of batch Dataflow jobs a week, and hundreds of concurrently running streaming jobs, we’ve been able to analyze and address common pain points for our users, as well as add optimizations to reduce memory footprint and lower overall cost. Among the Spotify workflows that have upgraded to Scio 0.7, we’ve seen up to a 25% reduction in cost and a 20% reduction in runtime.

Here are some of the updates we’ve introduced:

ScioIO

On the API side, we’ve always kept in mind that many Scio users are coming from a Java/Python background and are new to Scala, or functional programming in general. Our goal is to make it programmatically simple to work with mixed data sources and sinks like Avro, Protobuf, BigQuery, Pub/Sub and Bigtable, while maintaining strong type safety guarantees.

Prior to Scio 0.7 most of our I/O logic lived inside ScioContext and SCollection directly, or in implicit value classes for them. These methods generally returned Taps, which represent fixed external datasets that can be accessed either through an in-memory iterator or as an SCollection. For example, Avro output looked like this:

sealed trait SCollection[T] extends PCollectionWrapper[T] {
  def saveAsAvroFile(path: String,
                     numShards: Int = 0,
                     schema: Schema = null,
                     suffix: String = "",
                     codec: CodecFactory = CodecFactory.deflateCodec(6),
                     metadata: Map[String, AnyRef] = Map.empty):
  Future[Tap[T]] =
    if (context.isTest) {
      getTestInput(path)
    } else {
      // Check if T is a GenericRecord or SpecificRecord
      // Directly invoke Beam’s AvroIO methods
      // Return a Tap for the written .avro file
    }
}

We identified a few problems with this approach. Every I/O method was repeating the same conditionals around test behavior; method parameters were chosen as a best guess as to what the user would care about, with sometimes arbitrary default values; behavior of a Tap was unclear and didn’t make sense for non-atomic I/Os; and the enclosing classes were becoming increasingly lengthy and complex.

The new Scio version manages I/O with classes that extend a ScioIO trait, simplified below:

trait ScioIO[T] {
  type ReadP
  type WriteP
  def testId: String

  private[scio] def readWithContext(sc: ScioContext,
                                    params: ReadP): SCollection[T] = {
    if (sc.isTest) { readTest(sc, params) }
    else { read(sc, params }
  }
  private[scio] def writeWithContext(sc: ScioContext,
                                     params: WriteP): SCollection[T] = {
    if (sc.isTest) { writeTest(sc, params) }
    else { write(sc, params }
  }

  protected def read(sc: ScioContext, params: ReadP): SCollection[T]
  protected def write(data: SCollection[T], params: WriteP): Future[Tap[T]]

  protected def readTest(sc: ScioContext,
                         params: ReadP): SCollection[T] = {
    getTestInput(testId)
  }
  protected def writeTest(data: SCollection[T],
                          params: WriteP): Future[Tap[T]] = {
     putTestOutput(testId)
  }

  def tap(params: ReadP): Tap[T]
}

All I/Os must define a parameterized type T. For some implementations T is a static type — for example, TextIO always extends ScioIO[String] Others, like BigQueryIO, which allows case class types annotated with the @BigQueryType macro, keep the type T, as long as T <:< HasAnnotation.

I/Os must also use typed read and write params with consistent default values. Read- or write-only I/Os can be enforced by assigning ReadP or WriteP to type Nothing, making it impossible to invoke read() or write() since no instances of the Nothing type exist in Scala.

Convenience methods, like .saveAsAvroFile() shown above, still exist in package objects, but with much simpler implementations: they just construct the appropriate ScioIO class and directly call .readWithContext() or .writeWithContext(). These methods check whether the job is running in the unit testing context and delegate to either .read()/.write() or .readTest()/.writeTest(). The latter have default implementations in the ScioIO trait which use mocked data, but can be overridden in a ScioIO implementation if desired.

Coders

The second big change in Scio 0.7 is in the intermediate serialization layer. Any time intermediate data is passed across worker nodes (for joins, group-bys, auto-scaling, etc), those values must be serialized into bytes and deserialized at the destination site. Apache Beam requires each data type to correspond to a registered Coder, which handles the conversion to and from bytes. Coders themselves are serialized in Beam during job creation and deserialized before being used. Every method in a Coder must be thread-safe, and the encoded result must be verifiably deterministic, meaning:

  • two values that compare as equal (via Object.equals() or Comparable.compareTo(), if supported) have the same encoding.
  • the Coder always produces a canonical encoding, which is the same for an instance of an object even if produced on different computers at different times.

Beam itself provides a registry of coders for many common Java types, but Scio is responsible for encoding any Scala types or custom Java classes. Prior to Scio 0.7, we deferred coder derivation to Kryo, a serialization library for Java, and its Scala extension Chill, at runtime. This solution worked well for the most part, and was on par with what most data processing libraries (Flink, Spark, and Scalding, for example) were doing, with the advantage that users didn’t have to care about explicitly writing and registering coders. Over time, however, we noticed a few downsides:

  • We wanted to preserve Kryo state on a thread-local basis to allow for reuse, since creating Kryo instances is costly in terms of time and memory. However, we found that in jobs with high thread turnover (like long-running streaming jobs) this state wasn’t being garbage collected properly, leading to memory leaks. Additionally, using a common pool of instances resulted in threads that spent a long time blocking as they waited for access. We’ve spent a lot of time debugging and providing workarounds for this.
  • Since Coder derivation happened at runtime under the hood, users wouldn’t be aware of any serialization issues until their job was in progress, wasting money and developer time. Our decision to make serialization opaque to the user had the downside of making these issues difficult to debug–Kryo coders are very dynamic, so it’s hard to reverse-engineer Kryo state for a misbehaving job. Kryo’s permissiveness enabled some bad practices in user pipeline code. For example, our users frequently work with Avro records, whose String fields are deserialized into the CharSequence type. It’s common for these fields to end up as keys in groups or joins without being explicitly converted to Strings. Luckily Kryo happens to serialize the CharSequence type as a UTF-8 String, which is usually what users want, but the truth is that CharSequences should not be serialized at all as they’re not truly deterministic. Per its documentation:
    This interface does not refine the general contracts of the equals and hashCode methods. The result of comparing two objects that implement CharSequence is therefore, in general, undefined. Each object may be implemented by a different class, and there is no guarantee that each class will be capable of testing its instances for equality with those of the other. It is therefore inappropriate to use arbitrary CharSequence instances as elements in a set or as keys in a map.Another example is the serialization of null values. By default Kryo assumes any value might be null, which often ends up hiding errors in pipeline code. In contrast, Beam requires users to wrap their coders in a NullableCoder to explicitly indicate they expect null values. Otherwise, an exception is thrown at runtime when a null value is encountered.
  • Kryo coders are not always efficient, especially for Scala types. After a lot of trial and error we came up with a list of tuning tips for custom Kryo serializers, but they remained hard to profile and test. We also observed that Kryo serializers for Scala classes with mutable fields would occasionally throw runtime errors upon being deserialized in Beam.

Enter Magnolia in Scio 0.7! Since Scio’s I/O implementations are heavily coupled with Scala case classes (for example, our BigQuery and Avro macro annotations are used on case classes), we decided that instead of using Kryo at runtime, Scio would infer Coders at compile time, using Magnolia typeclass derivation whenever possible. Per Magnolia’s documentation:

Magnolia is a generic macro for automatic materialization of typeclasses for datatypes composed from case classes (products) and sealed traits (coproducts). It supports recursively-defined datatypes out-of-the-box, and incurs no significant time-penalty during compilation. If derivation fails, error messages are detailed and informative.

Magnolia-based Coders are based on a typeclass derivation for Coder[T]. They work by recursively traversing the ADT of the case class or sealed trait down to its leaf nodes and encoding those leaf primitives into bytes. We’ve introduced our own Coder type, simply defined as:

sealed trait Coder[T] extends Serializable

This trait is extended by more specific implementations, one of which is the new Magnolia-based Coder. Based on the definition of T (a sealed trait, or a case class), either dispatch() or combine() will construct the coder.

trait CoderDerivation {
  import magnolia._

  type Typeclass[T] = Coder[T]

  def combine[T](ctx: CaseClass[Coder, T]): Coder[T] = ...
  def dispatch[T](sealedTrait: SealedTrait[Coder, T]): Coder[T] = ...

/* wraps the implicitly scoped Coder as a com.spotify.scio.coders.Coder, pruning annotations that would cause serialization to fail */
  implicit def gen[T]: Coder[T] = macro CoderMacros.wrappedCoder[T]
}

We defined a Coder grammar to help compose Coders for complex types out of simpler building blocks. Existing Kryo and Beam Coders are examples of building blocks, but there are a few new, algebraic types that can be seen in examples on the Scio REPL:

  • Record: Given a case class, derive coders for all its parameters and combine them.scio> case class FooClass(id: Int) defined class FooClass scio> Coder[FooClass] res0: com.spotify.scio.coders.Coder[FooClass] = Record(FooClass, (id, Beam(VarIntCoder)))
  • Transform: Given an existing Coder for A and a function mapping a Beam Coder[A] to a Scio Coder[B], derive a Coder[B].scio> Coder[List[String]] res0: com.spotify.scio.coders.Coder[List[String]] = Transform(Beam(StringUtf8Coder), <function1>)You can also see in this example that the implicitly found coder for String, Beam’s Utf8 coder, is also part of the grammar.
  • Disjunction: Given a sealed Trait, provide a map of Coders for all its defined subtypes.scio> :paste // Entering paste mode (ctrl-D to finish) sealed trait MyTrait case class FooClass(id: String) extends MyTrait case class BarClass(timestamp: Long) extends MyTrait case class BazClass(age: Int) extends MyTrait Coder[MyTrait] // Exiting paste mode, now interpreting. res0: com.spotify.scio.coders.Coder[MyTrait] = Disjunction(MyTrait, Map(0 -> Record(FooClass, (id, Beam(StringUtf8Coder))), 1 -> Record(BarClass, (timestamp, Beam(BigEndianLongCoder))), 2 -> Record(BazClass, (age, Beam(VarIntCoder)))))

Ultimately, each Coder implementation is specifically materialized into an instance of Beam’s org.apache.beam.sdk.coders.Coder type, which requires the familiar decode() and encode() methods. Materialization happens when Scio interacts directly with Beam’s PCollection type and is required to set a Coder for that type:

val context: ScioContext
val internal: PCollection[T]

def someFn[U: Coder](fn: DoFn[T, U]): SCollection[U] = context
  .wrap(internal.apply(ParDo.of(fn)))
  .setCoder(CoderMaterializer.beam(context, Coder[U]))

We’ve kept Kryo as a fallback coder for cases Magnolia doesn’t support (such as non-case-classes and Java classes), with Scio logging a warning to the user for each Coder this applies to. Implementing the fallback at compile time is done with the help of Shapeless’s LowPriority trait mixed into the Magnolia typeclass derivation for Coder, simplified below:

trait CoderDerivation {
  ...
}

trait LowPriorityFallbackCoder extends CoderDerivation {
  def fallback[T](implicit lp: shapeless.LowPriority): Coder[T] =
    // logs warning message and creates a Kryo coder
    macro CoderMacros.issueFallbackWarning[T] 
}

Deriving Coders is still, for the most part, done behind the scenes, and most users can upgrade to 0.7 without worrying about any changes to serialization. They are brought into scope implicitly in the new API: for example, a Join is now defined as:

class PairSCollectionFunctions[K, V](val self: SCollection[(K, V)]) {

  def join[W: Coder](that: SCollection[(K, W)])
                    (implicit koder: Coder[K], voder: Coder[V]):
  SCollection[(K, (V, W))] = {
    ...
  }

}

If a user wants to implement their own coder (for example, to avoid the Kryo fallback) for a class, they must extend the Coder trait and bring it into the implicit scope, preferably in the companion object of the class:

class Foo(field: String)

object Foo {
  import org.apache.beam.sdk.coders.AtomicCoder

  implicit def coderFoo: Coder[Foo] = Coder.beam(new AtomicCoder[Foo] {
    def decode(in: InputStream): Foo = ???
    def encode(ts: Foo, out: OutputStream): Unit = ???
  })
}

Because performance had been such a pain point with Kryo serialization, before releasing Scio 0.7 we invested in profiling various jobs with and without the new Coder implementation. Our hypothesis was that this change would provide the most benefit to shuffle-heavy jobs, where data was frequently serialized and deserialized across worker nodes. We measure our jobs using the Dataflow metrics provided by GCP.

Across our daily batch benchmark jobs, which test individual operations like groups and joins on smaller, fixed datasets, we saw on average a 25% reduction in memory and physical disk usage and, on jobs that used Google’s shuffle service, an average 40% reduction in shuffle data processed. Since these jobs run on smaller datasets and take under 10 minutes to complete, we didn’t see significant improvements in elapsed time, but when we tested Scio 0.7 on real, large-scale production Spotify use cases, we started to see some real performance improvements. For a job that aggregates track listening history over a 20-day period, we saw:

  • 25% reduction in cost
  • 20% reduction in runtime
  • 12% reduction in max worker count
  • 20% reduction both in VCPU hours and memory usage in GB/hours
  • 27% reduction in shuffle data processed (on the order of 30 terabytes less)
  • 18% reduction in average intermediate PCollection size

What else?

Aside from revamped IO and Coders, other improvements in Scio 0.7.0 include:

Conclusion

We’re excited for people to try out Scio 0.7! To get started, we’ve released a Migration Guide that includes automated Scalafix rules for major API changes, and troubleshooting for common issues. You can get in touch with us by filing an issue on GitHub, joining our Gitter, or contacting the scio-users Google group.


Tags: