From 60f655569691abc0c68d72450e7adcb9d2437b8b Mon Sep 17 00:00:00 2001 From: Ben Konz Date: Mon, 5 Feb 2024 13:14:28 -0500 Subject: [PATCH] normalize field mode --- build.sbt | 8 ++++++- .../com/spotify/ratatool/BigQueryUtil.scala | 12 +++++++++++ .../com/spotify/ratatool/diffy/BigDiffy.scala | 21 +++++++++++-------- .../scalacheck/TableRowGenerator.scala | 4 ++-- 4 files changed, 33 insertions(+), 12 deletions(-) create mode 100644 ratatool-common/src/main/scala/com/spotify/ratatool/BigQueryUtil.scala diff --git a/build.sbt b/build.sbt index 06c31077..458d9b2a 100644 --- a/build.sbt +++ b/build.sbt @@ -146,7 +146,13 @@ lazy val releaseSettings = Seq( name = "Rafal Wojdyla", email = "ravwojdyla@gmail.com", url = url("https://twitter.com/ravwojdyla") - ) + ), + Developer( + id = "benk", + name = "Ben Konz", + email = "benkonz16@gmail.com", + url = url("https://benkonz.github.io/") + ), ) ) diff --git a/ratatool-common/src/main/scala/com/spotify/ratatool/BigQueryUtil.scala b/ratatool-common/src/main/scala/com/spotify/ratatool/BigQueryUtil.scala new file mode 100644 index 00000000..fa4dae40 --- /dev/null +++ b/ratatool-common/src/main/scala/com/spotify/ratatool/BigQueryUtil.scala @@ -0,0 +1,12 @@ +package com.spotify.ratatool + +object BigQueryUtil { + // a null TableFieldSchema mode can be treated as "NULLABLE", which is the + // default value according to the docs, so return "NULLABLE" if fieldMode is null + // otherwise return fieldMode + def getFieldModeWithDefault(fieldMode: String): String = + fieldMode match { + case null => "NULLABLE" + case _ => fieldMode + } +} diff --git a/ratatool-diffy/src/main/scala/com/spotify/ratatool/diffy/BigDiffy.scala b/ratatool-diffy/src/main/scala/com/spotify/ratatool/diffy/BigDiffy.scala index d86e4476..6ced920f 100644 --- a/ratatool-diffy/src/main/scala/com/spotify/ratatool/diffy/BigDiffy.scala +++ b/ratatool-diffy/src/main/scala/com/spotify/ratatool/diffy/BigDiffy.scala @@ -17,35 +17,33 @@ package com.spotify.ratatool.diffy -import java.nio.ByteBuffer import com.google.api.services.bigquery.model.{TableFieldSchema, TableRow, TableSchema} -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.BaseEncoding import com.google.protobuf.AbstractMessage +import com.spotify.ratatool.BigQueryUtil.getFieldModeWithDefault import com.spotify.ratatool.Command import com.spotify.ratatool.io.ParquetIO -import com.spotify.ratatool.samplers.{AvroSampler, ParquetSampler} +import com.spotify.ratatool.samplers.AvroSampler import com.spotify.scio._ import com.spotify.scio.avro._ import com.spotify.scio.bigquery._ -import com.spotify.scio.parquet.avro._ import com.spotify.scio.bigquery.client.BigQuery import com.spotify.scio.bigquery.types.BigQueryType import com.spotify.scio.coders.Coder import com.spotify.scio.coders.kryo._ import com.spotify.scio.io.ClosedTap +import com.spotify.scio.parquet.avro._ import com.spotify.scio.values.SCollection import com.twitter.algebird._ -import org.apache.avro.SchemaCompatibility.SchemaCompatibilityType -import org.apache.avro.{Schema, SchemaCompatibility} import org.apache.avro.generic.GenericRecord import org.apache.avro.specific.SpecificRecordBase import org.apache.beam.sdk.io.TextIO -import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.BaseEncoding import org.slf4j.{Logger, LoggerFactory} +import java.nio.ByteBuffer import scala.annotation.tailrec -import scala.jdk.CollectionConverters._ import scala.collection.mutable +import scala.jdk.CollectionConverters._ import scala.language.higherKinds import scala.reflect.ClassTag import scala.util.{Failure, Success, Try} @@ -555,7 +553,12 @@ object BigDiffy extends Command with Serializable { case (Some(f), None) => f case (None, Some(f)) => f case (Some(fx), Some(fy)) => - assert(fx.getType == fy.getType && fx.getMode == fy.getMode) + val fxMode = getFieldModeWithDefault(fx.getMode) + val fyMode = getFieldModeWithDefault(fy.getMode) + assert( + fx.getType == fy.getType && fxMode == fyMode, + f"field ${fx.getName} in lhs, type: ${fx.getType} mode: $fxMode, and rhs, type: ${fy.getType} mode: $fyMode, do not match" + ) if (fx.getType == "RECORD") { fx.setFields( mergeFields(fx.getFields.asScala.toList, fy.getFields.asScala.toList).asJava diff --git a/ratatool-scalacheck/src/main/scala/com/spotify/ratatool/scalacheck/TableRowGenerator.scala b/ratatool-scalacheck/src/main/scala/com/spotify/ratatool/scalacheck/TableRowGenerator.scala index 29a0853e..8111f5c3 100644 --- a/ratatool-scalacheck/src/main/scala/com/spotify/ratatool/scalacheck/TableRowGenerator.scala +++ b/ratatool-scalacheck/src/main/scala/com/spotify/ratatool/scalacheck/TableRowGenerator.scala @@ -19,8 +19,8 @@ package com.spotify.ratatool.scalacheck import java.nio.ByteBuffer import java.util - import com.google.api.services.bigquery.model.{TableFieldSchema, TableRow, TableSchema} +import com.spotify.ratatool.BigQueryUtil.getFieldModeWithDefault import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.BaseEncoding import org.joda.time._ import org.joda.time.format.DateTimeFormat @@ -144,7 +144,7 @@ trait TableRowGeneratorOps { case t => throw new RuntimeException(s"Unknown type: $t") } - fieldSchema.getMode match { + getFieldModeWithDefault(fieldSchema.getMode) match { case "REQUIRED" => genV() case "NULLABLE" => Arbitrary.arbBool.arbitrary.flatMap { e =>