Skip to content

Conversation

@RustedBones
Copy link
Contributor

@RustedBones RustedBones commented Aug 13, 2024

Update all bigtable APIs to use cloud v2 client.

Drop BigtableBulkWriter. This is TBD, but it looks to me can manually shard/groupBy and flatten to normal IO if they require this feature.

Change bigtable ScioContext api to use Admin.Table and Admin.Instance objects that cache clients instances.

public abstract class BigtableDoFn<A, B> extends GuavaAsyncLookupDoFn<A, B, BigtableDataClient> {

private final BigtableOptions options;
private final Supplier<BigtableDataSettings> settingsSupplier;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunatelly BigtableDataSettings is not serializable.

Comment on lines +140 to +141
val sideOutput = PCollectionTuple.of(BigtableWrite.BigtableWriteResult.tupleTag, result)
(tap(()), SideOutputCollections(sideOutput, data.context))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leverage WriteResultIO to expose the BigtableWriteResult as side output

Comment on lines -243 to +152
projectId: String,
instanceId: String,
tablesAndColumnFamilies: Map[String, Iterable[String]],
createDisposition: TableAdmin.CreateDisposition
): Unit =
if (!self.isTest) {
val bigtableOptions = BigtableOptions
.builder()
.setProjectId(projectId)
.setInstanceId(instanceId)
.build
TableAdmin.ensureTables(bigtableOptions, tablesAndColumnFamilies, createDisposition)
}

def ensureTables(
def ensureTable(
projectId: String,
instanceId: String,
tablesAndColumnFamilies: Map[String, Iterable[String]]
): Unit = ensureTables(
projectId,
instanceId,
tablesAndColumnFamilies,
TableAdmin.CreateDisposition.default
)

/**
* Ensure that tables and column families exist. Checks for existence of tables or creates them if
* they do not exist. Also checks for existence of column families within each table and creates
* them if they do not exist.
*
* @param tablesAndColumnFamilies
* A map of tables and column families. Keys are table names. Values are a list of column family
* names.
*/
def ensureTables(
bigtableOptions: BigtableOptions,
tablesAndColumnFamilies: Map[String, Iterable[String]],
createDisposition: TableAdmin.CreateDisposition
tableId: String,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As underlying client is cached, simplify the API an not worry if this is called multiple times.

}
final case class WriteParam private (
flowControl: Boolean = WriteParam.DefaultFlowControl,
errorHandler: ErrorHandler[BadRecord, _] = WriteParam.DefaultErrorHandler,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix #5440

@RustedBones RustedBones added this to the 0.15.0 milestone Aug 13, 2024
@codecov
Copy link

codecov bot commented Aug 13, 2024

Codecov Report

Attention: Patch coverage is 18.94737% with 77 lines in your changes missing coverage. Please review.

Project coverage is 61.57%. Comparing base (d5d20ad) to head (b7c672d).

Files with missing lines Patch % Lines
...c/main/scala/com/spotify/scio/bigtable/Admin.scala 0.00% 55 Missing ⚠️
...n/scala/com/spotify/scio/bigtable/BigTableIO.scala 50.00% 14 Missing ⚠️
...otify/scio/bigtable/syntax/ScioContextSyntax.scala 20.00% 8 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #5444      +/-   ##
==========================================
+ Coverage   61.29%   61.57%   +0.27%     
==========================================
  Files         314      314              
  Lines       11250    11213      -37     
  Branches      793      789       -4     
==========================================
+ Hits         6896     6904       +8     
+ Misses       4354     4309      -45     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@kellen kellen modified the milestones: 0.15.0, 0.16.0 Feb 13, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants