-
Notifications
You must be signed in to change notification settings - Fork 526
Refactor bigtable API to use v2 client #5444
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
| public abstract class BigtableDoFn<A, B> extends GuavaAsyncLookupDoFn<A, B, BigtableDataClient> { | ||
|
|
||
| private final BigtableOptions options; | ||
| private final Supplier<BigtableDataSettings> settingsSupplier; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunatelly BigtableDataSettings is not serializable.
| val sideOutput = PCollectionTuple.of(BigtableWrite.BigtableWriteResult.tupleTag, result) | ||
| (tap(()), SideOutputCollections(sideOutput, data.context)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Leverage WriteResultIO to expose the BigtableWriteResult as side output
| projectId: String, | ||
| instanceId: String, | ||
| tablesAndColumnFamilies: Map[String, Iterable[String]], | ||
| createDisposition: TableAdmin.CreateDisposition | ||
| ): Unit = | ||
| if (!self.isTest) { | ||
| val bigtableOptions = BigtableOptions | ||
| .builder() | ||
| .setProjectId(projectId) | ||
| .setInstanceId(instanceId) | ||
| .build | ||
| TableAdmin.ensureTables(bigtableOptions, tablesAndColumnFamilies, createDisposition) | ||
| } | ||
|
|
||
| def ensureTables( | ||
| def ensureTable( | ||
| projectId: String, | ||
| instanceId: String, | ||
| tablesAndColumnFamilies: Map[String, Iterable[String]] | ||
| ): Unit = ensureTables( | ||
| projectId, | ||
| instanceId, | ||
| tablesAndColumnFamilies, | ||
| TableAdmin.CreateDisposition.default | ||
| ) | ||
|
|
||
| /** | ||
| * Ensure that tables and column families exist. Checks for existence of tables or creates them if | ||
| * they do not exist. Also checks for existence of column families within each table and creates | ||
| * them if they do not exist. | ||
| * | ||
| * @param tablesAndColumnFamilies | ||
| * A map of tables and column families. Keys are table names. Values are a list of column family | ||
| * names. | ||
| */ | ||
| def ensureTables( | ||
| bigtableOptions: BigtableOptions, | ||
| tablesAndColumnFamilies: Map[String, Iterable[String]], | ||
| createDisposition: TableAdmin.CreateDisposition | ||
| tableId: String, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As underlying client is cached, simplify the API an not worry if this is called multiple times.
| } | ||
| final case class WriteParam private ( | ||
| flowControl: Boolean = WriteParam.DefaultFlowControl, | ||
| errorHandler: ErrorHandler[BadRecord, _] = WriteParam.DefaultErrorHandler, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix #5440
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #5444 +/- ##
==========================================
+ Coverage 61.29% 61.57% +0.27%
==========================================
Files 314 314
Lines 11250 11213 -37
Branches 793 789 -4
==========================================
+ Hits 6896 6904 +8
+ Misses 4354 4309 -45 ☔ View full report in Codecov by Sentry. |
28b4139 to
931277d
Compare
fc6db20 to
8229e14
Compare
Update all bigtable APIs to use cloud v2 client.
Drop
BigtableBulkWriter. This is TBD, but it looks to me can manually shard/groupBy and flatten to normal IO if they require this feature.Change bigtable
ScioContextapi to useAdmin.TableandAdmin.Instanceobjects that cache clients instances.