본문 바로가기

카테고리 없음

14. GCP를 사용한 머신러닝 자동화

9. 구글 클라우드를 사용한 머신러닝 자동화

이번 글에서는 GCP 데이터 플랫폼에 대해서 설명한다. AWS가 인프라, 서버리스에 우수하고, GCP는 데이터 분석과 AI에 우수한 서비스를 제공한다. GCP를 사용한 일반적인 데이터분석의 절차는 

  • 빅쿼리 등 데이터웨어하우스 혹은 데이터레이크에서 필요한 데이터를 추출한다
  • 데이터플로우를 사용해서 전처리를 수행한다. 전처리된 결과는 데이터과학자의 특정 볼륨으로 복사한다
  • 파이프라인 자동화는 쿠베플로우로, 쿠베플로우 노트북 서버를 사용해서 분석을 수행한다.
 

디지털북스 쿠브플로우 이명환 지음, 비제이퍼블릭 구글 클라우드 플랫폼 뽀개기 박정운 지음을 인용하였다

데이터플로우의 장점을 설명하면,

  1. ETL을 개발하다보면 상당히 유사한 로직을 중복해서 개발하는 경우가 있다. 즉 소스와 타겟 스키마만 다르고, 변환로직이 약간 다른 파이프라인이 다수인 것을 확인할 수 있다. 빔은 소스코드 변경없이, 변환로직과 스키마의 변경이 가능하다. 그러므로 유사한 다수의 파이프라인을 빠르게 개발할 수 있다.
  2. 다양한 개발언어와 런타임환경을 지원한다. 개발자와 운영자에게 유연하고 종속적이지 않은 환경을 제공한다.
  3. Apache Beam를 한가지 기술을 사용해서 데이터 파이프라인과 머신러닝 파이프라인을 개발할 수 있다. 이 블로그의 소스에 다음 블로그에서 자세하게 설명한다.

개인적으로 빔을 좋아하지만, 스파크로 인해서 빔을 배제하는 경우가 많다. 그럼에도 불구하고 구글은 아주 다양하게 빔을 활용하고 있다. 데이터 전처리, ELT 와 ELT 유형의 데이터 변환, 오케스트레이션, 실시간 스트리밍 등의 사례를 볼 수 있다. 

 

데이터플로우의 구성요소는 아래와 같다. 스파크 스트리밍과 비교해서 빔은 유사한 개념과 용어를 사용한다.

PCollection

PCollection은 데이터플로우 파이프라인이 작동하는 분산 데이터세트를 나타낸다. 일반적으로 파이프라인은 데이터 원본에서 데이터를 읽어서 초기 PCollection을 생성하지만, 드라이버 프로그램 내의 메모리 내 데이터에서 PCollection을 만들 수도 있다. 거기에서 PCollection는 파이프라인의 각 단계에 대한 입력과 출력이다

 

Transforms
파이프라인에서 데이터 처리 작업 또는 단계를 나타낸다. 모든 PTransform은 하나 이상의 PCollection 객체를 입력으로 가져와, 해당 PCollection의 요소에 제공하는 처리 기능을 수행하고, 0개 이상의 출력 PCollection 객체를 생성한다. 데이터를 변경할 때 사용하는 부분으로 아래 형태를 지원한다.

  1. ParDO은 데이터를 변환하거나 추출 또는 연산 시 사용한다
  2. GroupByKey은 Key-Value 행태의 컬렉션을 Key 기준으로 Value를 묶어준다
  3. CoGroupByKey은 동일한 Key의 데이터세트가 여러개 있을 때 Key별로 묶어준다
  4. Combine은 데이터의 요소 및 값의 컬렉션을 결합하기 위해 사용한다
  5. Flatten은 여러 종류의 PCollection를 하나의 PCollection로 합친다
  6. Partition은 큰 PCollection을 분할할 때 사용한다

윈도우

스트리밍 데이터와 같은 경우에는 데이터가 끊이지 않고 들어오기 때문에 결과를 내보내야 하는 타이밍을 잡기 애매하다. 이를 위해서 시간을 기준으로 작업을 끊어서 처리하는데 이를 윈도잉 이라고 한다. 크게 Fixed Window와 Sliding Window 그리고 Session Window 라는 개념을 사용한다.

 

워터마크

실제 데이터들이 발생한 시간과 서버에 도착하는 시간에는 차이가 발생할 수 있기 때문에 어느 시점까지 데이터를 기다렸다가 처리해야 하는지에 대한 고민이 생길 수 있다. 이때 실제 데이터가 도착하는 시간을 예측해야 하는데 이를 워터마크라고 한다. 워터마크를 기반으로 윈도우의 시스템상 시작 시간과 종료 시간을 예측하게 된다.

 

트리거

트리거는 처리 중인 데이터를 언제 다음 단계로 넘길지를 결정하는 개념이다. 만약 윈도우 길이가 1시간이라면 실제 결과를 1시간 뒤에나 볼 수 있는데, 이 경우에는 실시간 데이터 분석이라고 보기 애매한 상황이 발생한다.

 

이때 윈도우가 끝나기 전에 중간 계산한 값들을 보여줄 수 있는데, 이때 이용하는 것이 트리거이다. 트리거링이 될때마다 전달되는 데이터는 과연 이전 데이터를 처리할까, 처리하지 않을까 하는 의문이 있을 수 있는데 이때 Accumulating mode 옵션을 이용하여 누적 여부를 선택할 수 있다.

[Output PCollection] = [Input PCollection] | [Label] >> [Transform]

[Output PCollection] = [Input PCollection] | [Transform]

 

파이프라인을 생성할 때 외부 소스(파일이나, 데이터베이스 등)로부터 데이터를 읽어오거나, 처리한 데이터를 외부로 저장할 때 Pipeline I/O를 이용하여 저장할 수 있다.

 

트리거는 윈도우의 중간 집계를 위해서 사용한다. AfterWatermark()는 워터마크가 윈도우의 끝을 통과할 때 트리거링 되는 트리거이다.

 

빔의 홈페이지에는 다양한 템플릿을 제공한다. 템플릿에는 구글스토리지, 빅쿼리, 카프카 등과 연계를 쉽게 할 수 있도록 도와준다. 이전 글에서 빔과 카프카 연계를 언급했는데, 이번 글에서는 GCS와 빅쿼리 연계에 대해서 다루도록 한다. 데이터분석 개발 시 유용한 2개 유형의 데이터플로우를 소개한다.

 

GCS에서 GCS로 적재

5–1

import argparse
import logging
import re
import apache_beam as beamclass ConvertJSON(beam.DoFn):
  def process(self, cols):
    head = 'Account_ID,Account_Name,Year,Month,YYYYMMDD,' \
        'Region,Product,Category,Usage_type,Description,Qty,Cost'.split(',')
    row = dict(zip(head, cols))
    # print(row)
    yield rowclass Ingestion(beam.DoFn):
  def process(self, element):
    ptn_instance_type = "((?:[a-z]{2,6}\.)?(?:[a-z]{1,2}\d{1})(?:[a-z]{1,2})?\.\w{2,9})"
    self.up_type1(element, 'Usage_type', ptn_instance_type, 'Instance_type')
    self.up_type1(element, 'Description', ptn_instance_type, 'Instance_type_2')
    self.up_type2(element, 'Instance_type', 'Normalization_factor_Usage_type')
    self.up_type2(element, 'Instance_type_2', 'Normalization_factor_Description')
    self.up_qty(element, 'Qty', 'Qty_720')
    self.up_cost_type(element, 'Cost_type')
    self.up_ec2os(element, 'Description', 'EC2_OS')
    self.up_dbeng(element, 100, 'Description', 'DB_engine1')
    self.up_dbeng(element, 2, 'Description', 'DB_engine2')
    self.up_datausage(element, 'Usage_type', 'Data_Usage_type')
    self.up_ebsusage(element, 'Usage_type', 'EBS_Usage_type')
    self.up_search(element, 'provisioned storage', 'provisioned', 'Description', 'EBS_provisioned')
    self.up_search(element, 'gp2,io1,st1,sc1', 'SSD(gp2),IOPS SSD(io1),HDD(st1),HDD(sc1)', 'Description', 'EBS_Volume_type')
    self.up_search(element,
                   'Reduced Redundancy Storage|Standard-Infrequent Access|Intelligent-Tiering, Frequent Access Tier|Glacier|One Zone-Infrequent',
                   'RRS|Standard-IA|Intelligent|Glacier|One Zone-IA',
                   'Description', 'S3_class', '|', 'Standard')
    self.up_search(element,
                   'GET|PUT, COPY, POST, or LIST|PUT, COPY, POST or LIST|transitions to Standard-Infrequent Access',
                   'GET|PUT. COPY. POST. or LIST|reqeust Standard-IA|transit Standard-IA',
                   'Description', 'S3_request', '|')
    self.up_search(element,
                   'DataScanned,Egress,Dashboard,VendedLog,CW:GMD-Metrics,CW:Requests,CW:AlarmMonitorUsage,CW:MetricMonitorUsage',
                   'DataScanned,S3-Egress,Dashboard,VendedLog,GMD-Metrics,Requests,AlarmMonitor,MetricMonitor',
                   'Usage_type', 'Cloudwatch_type')
    self.up_search(element,
                   'HTTP-Proxy,HTTPS-Proxy',
                   'HTTP-Proxy,HTTPS-Proxy',
                   'Usage_type', 'CF_request_type')
    self.up_search(element,
                   'ReadCapacityUnit,WriteCapacityUnit',
                   'Read,Write',
                   'Usage_type', 'Dynamo_capacity_unit')
    self.up_search(element,
                   'Lambda-GB,Request',
                   'Compute,Request',
                   'Usage_type', 'Lambda_type')
    self.up_search(element,
                   'ApiGatewayRequest,ApiGatewayCacheUsage',
                   'Api-Request,Api-Cache',
                   'Usage_type', 'ApiGateway_type')
    self.up_search(element,
                   'Fargate-GB,Fargate-vCPU,EC2-vCPU,EC2-GB',
                   'Memory(GB),vCPU,EC2-vCPU,EC2-Memory(GB)',
                   'Usage_type', 'Fargate_type')
    self.up_search(element,
                   'PI_API,Aurora:StorageIOUsage,Aurora:StorageUsage,Aurora:ServerlessUsage,Aurora:BackupUsage,RDS:Multi-AZ-PIOPS-Storage,RDS:Multi-AZ-PIOPS,RDS:PIOPS-Storage,RDS:PIOPS,RDS:StorageIOUsage,RDS:Multi-AZ-StorageUsage,RDS:Multi-AZ-GP2-Storage,RDS:GP2-Storage,RDS:StorageUsage',
                   'API requests,I/O requests,Storage usage,Serverless usage,Backup usage,Multi-AZ PIOPS storage,Multi-AZ-PIOPS,PIOS storage,PIOPS,I/O requests,Multi-AZ storage,Multi-AZ gp2 storage,gp2 storage,storage usage',
                   'Usage_type', 'DB_storage_type')
    # print(element)
    yield elementdef up_type1(self, element, source_key, pattern, target_key):
    result = re.findall(pattern, element.get(source_key))
    element[target_key] = result[0] if len(result) > 0 else ''def up_type2(self, element, source_key, target_key):
    target = element.get(source_key)
    instance_names = '32xl,24xl,18xl,16xl,12xl,10xl,9xl,8xl,6xl,4xl,3xl,2xl,xl,large,medium,small,micro,nano'.split(',')
    instance_factors = [1024, 768, 576, 512, 384, 320, 288, 256, 192, 128, 96, 64, 32, 16, 8, 4, 2, 1]
    for idx, name in enumerate(instance_names):
      if name in target:
        element[target_key] = instance_factors[idx]
        return True
    element[target_key] = 0def up_search(self, element, search_str, factor_str, source_key, target_key, delimiter=',', others=''):
    target = element.get(source_key)
    search = search_str.split(delimiter)
    factor = factor_str.split(delimiter)
    for idx, s in enumerate(search):
      if s in target:
        element[target_key] = factor[idx]
        return True
    element[target_key] = othersdef up_ec2os(self, element, source_key, target_key):
    target = element.get(source_key)
    oss = 'Linux,RHEL,SQL,Windows'.split(',')
    for idx, os in enumerate(oss):
      if os in target:
        element[target_key] = os
        return True
    element[target_key] = ''def up_dbeng(self, element, by, source_key, target_key):
    target = element.get(source_key)
    db_origin = 'MySQL,PostgreSQL,Aurora,MariaDB,SQL Server,Oracle'.split(',')
    db_factor = 'MySQL,PostgreSQL,Aurora,MariaDB,MS-SQL,Oracle'.split(',')
    for idx, db in enumerate(db_origin):
      if db in target and idx < by:
        element[target_key] = db_factor[idx]
        return True
    element[target_key] = ''def up_datausage(self, element, source_key, target_key):
    target = element.get(source_key)
    search = 'Out-Bytes,In-Bytes,Regional-Bytes'.split(',')
    for idx, s in enumerate(search):
      if s in target:
        element[target_key] = s
        # print(s)
        return True
    element[target_key] = ''def up_ebsusage(self, element, source_key, target_key):
    ptn_ebs = "S:([a-zA-Z0-9.]{2,20})"
    result = re.findall(ptn_ebs, element.get(source_key))
    element[target_key] = result[0] if len(result) > 0 else ''def up_qty(self, element, source_key, target_key):
    target = element.get(source_key)
    qty = 0
    try:
      qty = round(float(target) / 720)
    except ValueError:
      pass
    element[target_key] = qtydef up_cost_type(self, element, target_key):
    category = element.get('Category')
    usage = element.get('Usage_type')
    desc = element.get('Description')
    search_category = [
      'Reserved Instance Purchase',
      'Reserved Instance Monthly Prepayment',
      'Instance Usage',
      'Savings Plan Covered Usage',
      'Savings Plan Recurring Fee']
    factor_category = [
      '선납금',
      '미사용 RI',
      'OnDemand',
      'SP',
      '미사용SP']
    search_usage = ['Spot', 'Dedicatedusage']
    factor_usage = ['Spot', 'Dedicated']
    search_desc = ['reserved']
    factor_desc = ['RI']
    factor = ''
    for i, s in enumerate(search_category):
      if s in category:
        factor = factor_category[i]
        break
    if len(factor) < 1:
      for j, s in enumerate(search_usage):
        if s in usage:
          factor = factor_usage[j]
          break
    if len(factor) < 1:
      for k, s in enumerate(search_desc):
        if s in desc:
          factor = factor_desc[k]
          break
    element[target_key] = factorclass MapToString(beam.DoFn):
  def process(self, row):
    yield ','.join(str(x) for x in row.values())def run(argv=None):
  """The main function which creates the pipeline and runs it."""
  parser = argparse.ArgumentParser()parser.add_argument(
    '--input',
    dest='input',
    required=False,
    help='Input file to read. This can be a local file or a file in a Google Storage Bucket.',
    default=r'test_big.csv')parser.add_argument('--output',
            dest='output',
            required=False,
            help='output file.',
            default=r'output')job_name = 'test-bee-dataflow'
  DEST_DIR = "gs://proj-bee/"
  options = {
    'staging_location': DEST_DIR + 'staging',
    'temp_location': DEST_DIR + 'tmp',
    'job_name': job_name,
    'project': 'mzcdsc-team-200716',
    'region': 'asia-northeast1',
    'no_save_main_session': True,
    'save_main_session': False
  }known_args, pipeline_args = parser.parse_known_args(argv)# opts = beam.pipeline.PipelineOptions(flags=[], **options)
  opts = beam.pipeline.PipelineOptions()# p = beam.Pipeline('DataflowRunner', options=opts)
  p = beam.Pipeline('DirectRunner', options=opts)(p
   | 'Read File' >> beam.io.ReadFromText(known_args.input, skip_header_lines=1)
   | 'Split Columns' >> beam.Map(lambda x: x.split(','))
   | 'Convert JSON' >> beam.ParDo(ConvertJSON())
   | 'Enrich Data' >> beam.ParDo(Ingestion())
   | 'Convert String' >> beam.ParDo(MapToString())
   | 'Save File' >> beam.io.WriteToText(known_args.output, num_shards=1, file_name_suffix='.csv',
                      header='Account_ID,Account_Name,Year,Month,YYYYMMDD,Region,Product,Category,Usage_type,Description,Qty,Cost,'
                             'Instance_type,Instance_type2,Normalization_factor_Usage_type,Normalization_factor_Description,'
                             'Qty_720,Cost_type,EC2_OS,DB_engine1,DB_engine2,Data_Usage_type,EBS_Usage_type,S3_class,S3_request,'
                             'Cloudwatch_type,CF_request_type,Dynamo_capacity_unit,Lambda_type,ApiGateway_type,Fargate_type,DB_storage_type')
   )result = p.run()
  result.wait_until_finish()if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  run()

GCS에서 빅쿼리로 적재

5–2

import argparse
import logging
import re
import os
import apache_beam as beam
from apache_beam.io.gcp.bigquery import parse_table_schema_from_jsonclass ConvertJSON(beam.DoFn):
  def process(self, cols):
    head = 'Account_ID,Account_Name,Year,Month,YYYYMMDD,Region,Product,Category,Usage_type,Description,Qty,Cost'.split(',')
    row = dict(zip(head, cols))
    yield rowclass Pipeline:
  def __init__(self):
    dir_path = os.path.dirname(os.path.realpath(__file__))
    self.schema_data = ''
    schema_file = os.path.join(dir_path, 'resources', 'schema.json')
    with open(schema_file) as f:
      data = f.read()
      self.schema_data = '{"fields": ' + data + '}'class Ingestion(beam.DoFn):
  def process(self, element):
    ptn_instance_type = "((?:[a-z]{2,6}\.)?(?:[a-z]{1,2}\d{1})(?:[a-z]{1,2})?\.\w{2,9})"
    self.up_type1(element, 'Usage_type', ptn_instance_type, 'Instance_type')
    self.up_type1(element, 'Description', ptn_instance_type, 'Instance_type_2')
    self.up_type2(element, 'Instance_type', 'Normalization_factor_Usage_type')
    self.up_type2(element, 'Instance_type_2', 'Normalization_factor_Description')
    self.up_qty(element, 'Qty', 'Qty_720')
    self.up_cost_type(element, 'Cost_type')
    self.up_search(element, 'Linux,RHEL,SQL,Windows', 'Linux,RHEL,SQL,Windows', 'Description', 'EC2_OS')
    self.up_dbeng(element, 100, 'Description', 'DB_engine1')
    self.up_dbeng(element, 2, 'Description', 'DB_engine2')
    self.up_search(element, 'Out-Bytes,In-Bytes,Regional-Bytes', 'Out-Bytes,In-Bytes,Regional-Bytes', 'Usage_type', 'Data_Usage_type')
    self.up_ebsusage(element, 'Usage_type', 'EBS_Usage_type')
    self.up_search(element,
                   'provisioned storage',
                   'provisioned',
                   'Description', 'EBS_provisioned')
    self.up_search(element,
                   'gp2,io1,st1,sc1',
                   'SSD(gp2),IOPS SSD(io1),HDD(st1),HDD(sc1)',
                   'Description', 'EBS_Volume_type')
    self.up_search(element,
                   'Reduced Redundancy Storage|Standard-Infrequent Access|Intelligent-Tiering, Frequent Access Tier|Glacier|One Zone-Infrequent',
                   'RRS|Standard-IA|Intelligent|Glacier|One Zone-IA',
                   'Description', 'S3_class', '|', 'Standard')
    self.up_search(element,
                   'GET|PUT, COPY, POST, or LIST|PUT, COPY, POST or LIST|transitions to Standard-Infrequent Access',
                   'GET|PUT, COPY, POST, or LIST|reqeust Standard-IA|transit Standard-IA',
                   'Description', 'S3_request', '|')
    self.up_search(element,
                   'DataScanned,Egress,Dashboard,VendedLog,CW:GMD-Metrics,CW:Requests,CW:AlarmMonitorUsage,CW:MetricMonitorUsage',
                   'DataScanned,S3-Egress,Dashboard,VendedLog,GMD-Metrics,Requests,AlarmMonitor,MetricMonitor',
                   'Usage_type', 'Cloudwatch_type')
    self.up_search(element,
                   'HTTP-Proxy,HTTPS-Proxy',
                   'HTTP-Proxy,HTTPS-Proxy',
                   'Usage_type', 'CF_request_type')
    self.up_search(element,
                   'ReadCapacityUnit,WriteCapacityUnit',
                   'Read,Write',
                   'Usage_type', 'Dynamo_capacity_unit')
    self.up_search(element,
                   'Lambda-GB,Request',
                   'Compute,Request',
                   'Usage_type', 'Lambda_type')
    self.up_search(element,
                   'ApiGatewayRequest,ApiGatewayCacheUsage',
                   'Api-Request,Api-Cache',
                   'Usage_type', 'ApiGateway_type')
    self.up_search(element,
                   'Fargate-GB,Fargate-vCPU,EC2-vCPU,EC2-GB',
                   'Memory(GB),vCPU,EC2-vCPU,EC2-Memory(GB)',
                   'Usage_type', 'Fargate_type')
    self.up_search(element,
                   'PI_API,Aurora:StorageIOUsage,Aurora:StorageUsage,Aurora:ServerlessUsage,Aurora:BackupUsage,RDS:Multi-AZ-PIOPS-Storage,RDS:Multi-AZ-PIOPS,RDS:PIOPS-Storage,RDS:PIOPS,RDS:StorageIOUsage,RDS:Multi-AZ-StorageUsage,RDS:Multi-AZ-GP2-Storage,RDS:GP2-Storage,RDS:StorageUsage',
                   'API requests,I/O requests,Storage usage,Serverless usage,Backup usage,Multi-AZ PIOPS storage,Multi-AZ-PIOPS,PIOS storage,PIOPS,I/O requests,Multi-AZ storage,Multi-AZ gp2 storage,gp2 storage,storage usage',
                   'Usage_type', 'DB_storage_type')
    yield elementdef up_type1(self, element, source_key, pattern, target_key):
    result = re.findall(pattern, element.get(source_key))
    element[target_key] = result[0] if len(result) > 0 else ''def up_type2(self, element, source_key, target_key):
    target = element.get(source_key)
    instance_names = '32xl,24xl,18xl,16xl,12xl,10xl,9xl,8xl,6xl,4xl,3xl,2xl,xl,large,medium,small,micro,nano'.split(',')
    instance_factors = [1024, 768, 576, 512, 384, 320, 288, 256, 192, 128, 96, 64, 32, 16, 8, 4, 2, 1]
    for idx, name in enumerate(instance_names):
      if name in target:
        element[target_key] = instance_factors[idx]
        return True
    element[target_key] = 0def up_search(self, element, search_str, factor_str, source_key, target_key, delimiter=',', others=''):
    target = element.get(source_key)
    search = search_str.split(delimiter)
    factor = factor_str.split(delimiter)
    for idx, s in enumerate(search):
      if s in target:
        element[target_key] = factor[idx]
        return True
    element[target_key] = othersdef up_ec2os(self, element, source_key, target_key):
    target = element.get(source_key)
    oss = 'Linux,RHEL,SQL,Windows'.split(',')
    for idx, os in enumerate(oss):
      if os in target:
        element[target_key] = os
        return True
    element[target_key] = ''def up_dbeng(self, element, by, source_key, target_key):
    target = element.get(source_key)
    db_origin = 'MySQL,PostgreSQL,Aurora,MariaDB,SQL Server,Oracle'.split(',')
    db_factor = 'MySQL,PostgreSQL,Aurora,MariaDB,MS-SQL,Oracle'.split(',')
    for idx, db in enumerate(db_origin):
      if db in target and idx < by:
        element[target_key] = db_factor[idx]
        return True
    element[target_key] = ''def up_datausage(self, element, source_key, target_key):
    target = element.get(source_key)
    search = 'Out-Bytes,In-Bytes,Regional-Bytes'.split(',')
    for idx, s in enumerate(search):
      if s in target:
        element[target_key] = s
        return True
    element[target_key] = ''def up_ebsusage(self, element, source_key, target_key):
    ptn_ebs = "S:([a-zA-Z0-9.]{2,20})"
    result = re.findall(ptn_ebs, element.get(source_key))
    element[target_key] = result[0] if len(result) > 0 else ''def up_qty(self, element, source_key, target_key):
    target = element.get(source_key)
    qty = 0
    try:
      qty = round(float(target) / 720)
    except ValueError:
      pass
    element[target_key] = qtydef up_cost_type(self, element, target_key):
    category = element.get('Category')
    usage = element.get('Usage_type')
    desc = element.get('Description')
    search_category = [
      'Reserved Instance Purchase',
      'Reserved Instance Monthly Prepayment',
      'Instance Usage',
      'Savings Plan Covered Usage',
      'Savings Plan Recurring Fee']
    factor_category = [
      '선납금',
      '미사용 RI',
      'OnDemand',
      'SP',
      '미사용SP']
    search_usage = ['Spot', 'Dedicatedusage']
    factor_usage = ['Spot', 'Dedicated']
    search_desc = ['reserved']
    factor_desc = ['RI']
    factor = ''
    for i, s in enumerate(search_category):
      if s in category:
        factor = factor_category[i]
        break
    if len(factor) < 1:
      for j, s in enumerate(search_usage):
        if s in usage:
          factor = factor_usage[j]
          break
    if len(factor) < 1:
      for k, s in enumerate(search_desc):
        if s in desc:
          factor = factor_desc[k]
          break
    element[target_key] = factordef run(argv=None):
    parser = argparse.ArgumentParser()
    parser.add_argument('--input',
                        dest='input',
                        default='gs://bucket/input.csv')
    parser.add_argument('--output',
                        dest='output',
                        default='dataset.table')
    job_name = 'Pipeline Type 1'
    DEST_DIR = "gs://bucket/"
    known_args, pipeline_args = parser.parse_known_args(argv)
    options = {
        'staging_location': DEST_DIR + 'staging',
        'temp_location': DEST_DIR + 'tmp',
        'job_name': job_name,
        'project': 'project', 
        'region': 'region',
        'no_save_main_session': True ,  
        'save_main_session': False
    }
    opts = beam.pipeline.PipelineOptions(flags=[], **options)p = beam.Pipeline('DataflowRunner', options=opts)
    # p = beam.Pipeline('DirectRunner', options=opts)pipeline = Pipeline()
    schema = parse_table_schema_from_json(pipeline.schema_data)(p
      | 'Read File' >> beam.io.ReadFromText(known_args.input, skip_header_lines=1)
      | 'Split Columns' >> beam.Map(lambda x: x.split(','))
      | 'Convert JSON' >> beam.ParDo(ConvertJSON())
      | 'Enrich Data' >> beam.ParDo(Ingestion())
      | 'Write to BigQuery' >> beam.io.Write(
         beam.io.BigQuerySink(
           known_args.output,
           schema=schema,
           create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
           write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))
    )result = p.run()
    result.wait_until_finish()if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    run()

실행 시 아래와 같은 로그를 확인한다

5–3

(base) C:\>python dataflow.py
INFO:apache_beam.runners.portability.fn_api_runner.translations:================
==== <function annotate_downstream_side_inputs at 0x000000000891F670> ==========
==========
INFO:apache_beam.runners.portability.fn_api_runner.translations:================
==== <function fix_side_input_pcoll_coders at 0x000000000891F790> ==============
======
INFO:apache_beam.runners.portability.fn_api_runner.translations:================
==== <function pack_combiners at 0x000000000891FC10> ====================
INFO:apache_beam.runners.portability.fn_api_runner.translations:================
==== <function lift_combiners at 0x000000000891FD30> ====================
INFO:apache_beam.runners.portability.fn_api_runner.translations:================
==== <function expand_sdf at 0x000000000891FEE0> ====================
INFO:apache_beam.runners.portability.fn_api_runner.translations:================
==== <function expand_gbk at 0x000000000891FF70> ====================
INFO:apache_beam.runners.portability.fn_api_runner.translations:================
==== <function sink_flattens at 0x00000000089200D0> ====================
INFO:apache_beam.runners.portability.fn_api_runner.translations:================
==== <function greedily_fuse at 0x0000000008920160> ====================
INFO:apache_beam.runners.portability.fn_api_runner.translations:================
==== <function read_to_impulse at 0x00000000089201F0> ====================
INFO:apache_beam.runners.portability.fn_api_runner.translations:================
==== <function impulse_to_input at 0x0000000008920280> ====================
INFO:apache_beam.runners.portability.fn_api_runner.translations:================
==== <function sort_stages at 0x00000000089204C0> ====================
INFO:apache_beam.runners.portability.fn_api_runner.translations:================
==== <function setup_timer_mapping at 0x0000000008920430> ====================
INFO:apache_beam.runners.portability.fn_api_runner.translations:================
==== <function populate_data_channel_coders at 0x0000000008920550> =============
=======
INFO:apache_beam.runners.worker.statecache:Creating state cache with size 100
INFO:apache_beam.runners.portability.fn_api_runner.worker_handlers:Created Worke
r handler <apache_beam.runners.portability.fn_api_runner.worker_handlers.Embedde
dWorkerHandler object at 0x0000000008A52CA0> for environment ref_Environment_def
ault_environment_1 (beam:env:embedded_python:v1, b'')
INFO:apache_beam.runners.portability.fn_api_runner.fn_runner:Running ((((ref_App
liedPTransform_Read Lines from File/Read/Impulse_4)+(ref_AppliedPTransform_Read
Lines from File/Read/Map(<lambda at iobase.py:899>)_5))+(Read Lines from File/Re
ad/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/PairWithRestriction))+(Rea
d Lines from File/Read/SDFBoundedSourceReader/ParDo(SDFBoundedSourceDoFn)/SplitA
ndSizeRestriction))+(ref_PCollection_PCollection_2_split/Write)
INFO:apache_beam.runners.portability.fn_api_runner.fn_runner:Running ((((((((ref
_PCollection_PCollection_2_split/Read)+(Read Lines from File/Read/SDFBoundedSour
ceReader/ParDo(SDFBoundedSourceDoFn)/Process))+(ref_AppliedPTransform_Split Colu
mns from Line_8))+(ref_AppliedPTransform_Convert JSON_9))+(ref_AppliedPTransform
_Enrich Data_10))+(ref_AppliedPTransform_Convert to String_11))+(ref_AppliedPTra
nsform_Save To Local/Write/WriteImpl/Map(<lambda at iobase.py:1080>)_21))+(ref_A
ppliedPTransform_Save To Local/Write/WriteImpl/WindowInto(WindowIntoFn)_22))+(Sa
ve To Local/Write/WriteImpl/GroupByKey/Write)
INFO:apache_beam.runners.portability.fn_api_runner.fn_runner:Running (((((ref_Ap
pliedPTransform_Save To Local/Write/WriteImpl/DoOnce/Impulse_16)+(ref_AppliedPTr
ansform_Save To Local/Write/WriteImpl/DoOnce/FlatMap(<lambda at core.py:2963>)_1
7))+(ref_AppliedPTransform_Save To Local/Write/WriteImpl/DoOnce/Map(decode)_19))
+(ref_AppliedPTransform_Save To Local/Write/WriteImpl/InitializeWrite_20))+(ref_
PCollection_PCollection_10/Write))+(ref_PCollection_PCollection_11/Write)
INFO:apache_beam.runners.portability.fn_api_runner.fn_runner:Running ((Save To L
ocal/Write/WriteImpl/GroupByKey/Read)+(ref_AppliedPTransform_Save To Local/Write
/WriteImpl/WriteBundles_24))+(ref_PCollection_PCollection_15/Write)
INFO:apache_beam.runners.portability.fn_api_runner.fn_runner:Running ((ref_PColl
ection_PCollection_10/Read)+(ref_AppliedPTransform_Save To Local/Write/WriteImpl
/PreFinalize_25))+(ref_PCollection_PCollection_16/Write)
INFO:apache_beam.runners.portability.fn_api_runner.fn_runner:Running (ref_PColle
ction_PCollection_10/Read)+(ref_AppliedPTransform_Save To Local/Write/WriteImpl/
FinalizeWrite_26)
INFO:apache_beam.io.filebasedsink:Starting finalize_write threads with num_shard
s: 1 (skipped: 0), batches: 1, num_threads: 1
INFO:apache_beam.io.filebasedsink:Renamed 1 shards in 0.01 seconds.(base) C:\>

 

쿠베플로우 구성요소

Dataflow 혹은 Pandas를 사용해서 전처리가 완료되면, 완성된 데이터 세트를 사용해서 분석작업을 시작할 수 있다.

쿠베플로우 설치가 복잡하므로 간단하게 사용할 수 있는 Vagrant를 소개한다. 이를 통해 쿠베플로우를 쉽게 설치 및 구성할 수 있다. Vagrantfile은 아래와 같다

# -*- mode: ruby -*-
# vi: set ft=ruby :

# All Vagrant configuration is done below. The "2" in Vagrant.configure
# configures the configuration version (we support older styles for
# backwards compatibility). Please don't change it unless you know what
# you're doing.
Vagrant.configure("2") do |config|
  # The most common configuration options are documented and commented below.
  # For a complete reference, please see the online documentation at
  # https://docs.vagrantup.com.
  config.vm.provider "virtualbox" do |vb|
       vb.cpus = 8
       vb.memory = 29000
  end
  # Every Vagrant development environment requires a box. You can search for
  # boxes at https://vagrantcloud.com/search.
  config.vm.box = "arrikto/minikf"

  # Disable automatic box update checking. If you disable this, then
  # boxes will only be checked for updates when the user runs
  # `vagrant box outdated`. This is not recommended.
  # config.vm.box_check_update = false

  # Create a forwarded port mapping which allows access to a specific port
  # within the machine from a port on the host machine. In the example below,
  # accessing "localhost:8080" will access port 80 on the guest machine.
  # NOTE: This will enable public access to the opened port
  # config.vm.network "forwarded_port", guest: 80, host: 8080

  # Create a forwarded port mapping which allows access to a specific port
  # within the machine from a port on the host machine and only allow access
  # via 127.0.0.1 to disable public access
  # config.vm.network "forwarded_port", guest: 80, host: 8080, host_ip: "127.0.0.1"

  # Create a private network, which allows host-only access to the machine
  # using a specific IP.
  # config.vm.network "private_network", ip: "192.168.33.10"

  # Create a public network, which generally matched to bridged network.
  # Bridged networks make the machine appear as another physical device on
  # your network.
  # config.vm.network "public_network"

  # Share an additional folder to the guest VM. The first argument is
  # the path on the host to the actual folder. The second argument is
  # the path on the guest to mount the folder. And the optional third
  # argument is a set of non-required options.
  # config.vm.synced_folder "../data", "/vagrant_data"

  # Provider-specific configuration so you can fine-tune various
  # backing providers for Vagrant. These expose provider-specific options.
  # Example for VirtualBox:
  #
  # config.vm.provider "virtualbox" do |vb|
  #   # Display the VirtualBox GUI when booting the machine
  #   vb.gui = true
  #
  #   # Customize the amount of memory on the VM:
  #   vb.memory = "1024"
  # end
  #
  # View the documentation for the provider you are using for more
  # information on available options.

  # Enable provisioning with a shell script. Additional provisioners such as
  # Ansible, Chef, Docker, Puppet and Salt are also available. Please see the
  # documentation for more information about their specific syntax and use.
  # config.vm.provision "shell", inline: <<-SHELL
  #   apt-get update
  #   apt-get install -y apache2
  # SHELL
end

http://10.10.10.10 에서 추가 구성 작업을 진행한다

C:\Users\EUROPE>vagrant up
==> vagrant: A new version of Vagrant is available: 2.2.19 (installed version: 2.2.18)!
==> vagrant: To upgrade visit: https://www.vagrantup.com/downloads.html

Bringing machine 'default' up with 'virtualbox' provider...
==> default: Box 'arrikto/minikf' could not be found. Attempting to find and install...
    default: Box Provider: virtualbox
    default: Box Version: >= 0
==> default: Loading metadata for box 'arrikto/minikf'
    default: URL: https://vagrantcloud.com/arrikto/minikf
==> default: Adding box 'arrikto/minikf' (v20210428.0.1) for provider: virtualbox
    default: Downloading: https://vagrantcloud.com/arrikto/boxes/minikf/versions/20210428.0.1/providers/virtualbox.box
Download redirected to host: storage.googleapis.com
    default:
==> default: Successfully added box 'arrikto/minikf' (v20210428.0.1) for 'virtualbox'!
==> default: Using C:/Users/EUROPE/minikf-user-data.vdi for persistent storage.
==> default: Importing base box 'arrikto/minikf'...
==> default: Generating MAC address for NAT networking...
==> default: Using C:/Users/EUROPE/minikf-user-data.vdi for persistent storage.
==> default: Checking if box 'arrikto/minikf' version '20210428.0.1' is up to date...
==> default: Setting the name of the VM: EUROPE_default_1641818037229_97223
==> default: Clearing any previously set network interfaces...
==> default: Preparing network interfaces based on configuration...
    default: Adapter 1: nat
    default: Adapter 2: hostonly
==> default: Forwarding ports...
    default: 32123 (guest) => 32123 (host) (adapter 1)
    default: 22 (guest) => 2222 (host) (adapter 1)
==> default: Using C:/Users/EUROPE/minikf-user-data.vdi for persistent storage.
==> default: Running 'pre-boot' VM customizations...
==> default: Using C:/Users/EUROPE/minikf-user-data.vdi for persistent storage.
==> default: ** Creating persistent storage **
==> default: Using C:/Users/EUROPE/minikf-user-data.vdi for persistent storage.
==> default: ** Attaching persistent storage **
==> default: Booting VM...
==> default: Waiting for machine to boot. This may take a few minutes...
    default: SSH address: 127.0.0.1:2222
    default: SSH username: vagrant
    default: SSH auth method: private key
    default:
    default: Vagrant insecure key detected. Vagrant will automatically replace
    default: this with a newly generated keypair for better security.
    default:
    default: Inserting generated public key within guest...
    default: Removing insecure key from the guest if it's present...
    default: Key inserted! Disconnecting and reconnecting using new SSH key...
==> default: Machine booted and ready!
==> default: Checking for guest additions in VM...
    default: The guest additions on this VM do not match the installed version of
    default: VirtualBox! In most cases this is fine, but in rare cases it can
    default: prevent things such as shared folders from working properly. If you see
    default: shared folder errors, please make sure the guest additions within the
    default: virtual machine match the version of VirtualBox you have installed on
    default: your host and reload your VM.
    default:
    default: Guest Additions Version: 5.2.42_Ubuntu r137960
    default: VirtualBox Version: 6.1
==> default: Using C:/Users/EUROPE/minikf-user-data.vdi for persistent storage.
==> default: ** Managing persistent storage **
==> default: Setting hostname...
==> default: Configuring and enabling network interfaces...
==> default: Mounting shared folders...
    default: /vagrant => C:/Users/EUROPE

==> default: Machine 'default' has a post `vagrant up` message. This is a message
==> default: from the creator of the Vagrantfile, and not from Vagrant itself:
==> default:
==> default:     Welcome to MiniKF!
==> default:     Visit http://10.10.10.10/ to get started.
==> default:

C:\Users\EUROPE>

노트북 서버

노트북 서버는 쿠버네티스 위에서 실행되는 주피터 노트북 서버이다. 쿠버네티스에서 리소스를 스케줄링하기 때문에 사용자는 노트북의 설정만으로도 간단히 노트북 할당을 받을 수 있다. 일반 노트북에서는 대용량 데이터 분석이 쉽지 않다. 다수 데이터 과학자가 노트북을 사용해서 대용량 전처리를 할 경우에, 메모리 GPU 등 자원이 부족하는 문제가 발생하고, 전체 시스템이 다운되는 경우가 발생할 수 있다. 노트북 서버를 통해서 정해진 자원을 할당받고, 다른 작업자와 충돌없이 보안이 보장되는 샌드박스를 구축할 수 있다. 대용량 파일이거나 빠른 처리를 위해서 쿠베플로우 노트북을 사용함으로써 대용량 데이터를 손쉽게 전처리 및 분석할 수 있다. 

 

페어링
페어링은 주피터 노트북, 파이썬 파일을 도커 이미지로 빌드한다. 이미지가 빌드되면 설정한 도커 레지스트리에 푸시한다. 푸시가 완료되면 설정한 배포 리소스 타입에 따라 쿠버네티스 잡, KFServing 등의 리소스로 변환하여 쿠버네티스 API 서버로 요청을 한다. 이 과정을 페어링 패키지는 크게 preprocessor, builder, deployer 구조로 나누어서 실행합니다.

  1. preprocessor: 작성된 코드를 도커이미지에 넣을 수 있도록 패키지화
  2. builder: 패키지된 파일을 도커 이미지화
  3. deployer: 생성된 이미지를 쿠버네티스 클러스터에 배포

파이프라인

파이프라인은 컨테이너 기반의 워크플로우를 만들고 배포할 수 있는 쿠버네티스 플랫폼이다. 컨테이너 기반으로 구성되어 있기 때문에 확장성 및 재사용성이 좋고 쿠베플로우의 대표적인 플랫폼이다.

기본적인 파이프라인 템플릿은 아래와 같다.

 

5–4

import kfp
import kfp.dsl as dsl
from kfp import compiler
from kfp import components# Create a python function@dsl.python_component(
    name='add_op',
    description='adds two numbers',
    base_image=BASE_IMAGE  # you can define the base image here, or when you build in the next step. 
)
def add(a: float, b: float) -> float:
    '''Calculates sum of two arguments'''
    print(a, '+', b, '=', a + b)
    return a + b# Convert the function to a pipeline operation.
add_op = components.func_to_container_op(
    add,
    base_image=BASE_IMAGE, 
)# Build a pipeline using the component
@dsl.pipeline(
   name='Calculation pipeline',
   description='A toy pipeline that performs arithmetic calculations.'
)
def calc_pipeline(
   a: float =0,
   b: float =7
):
    #Passing pipeline parameter and a constant value as operation arguments
    add_task = add_op(a, 4) #Returns a dsl.ContainerOp class instance. 
    #You can create explicit dependency between the tasks using xyz_task.after(abc_task)
    add_2_task = add_op(a, b)
    add_3_task = add_op(add_task.output, add_2_task.output)# Specify pipeline argument values
arguments = {'a': '7', 'b': '8'}
# Launch a pipeline run given the pipeline function definition
kfp.Client().create_run_from_pipeline_func(calc_pipeline, arguments=arguments, 
                                           experiment_name=EXPERIMENT_NAME)
# The generated links below lead to the Experiment page and the pipeline run details page, respectively# Compile the pipeline
pipeline_func = calc_pipeline
pipeline_filename = pipeline_func.__name__ + '.pipeline.zip'
compiler.Compiler().compile(pipeline_func, pipeline_filename)# Get or create an experiment
client = kfp.Client()
experiment = client.create_experiment(EXPERIMENT_NAME)# Specify pipeline argument values
arguments = {'a': '7', 'b': '8'}# Submit a pipeline run
run_name = pipeline_func.__name__ + ' run'
run_result = client.run_pipeline(experiment.id, run_name, pipeline_filename, arguments)

파이프라인은 아래와 같이 구성되어 있다

  1. 실험(Experiment), 잡(Job), 런(Run)을 추적하고 관리하는 유저 인터페이스
  2. 머신러닝 워크플로우 단계별 스케줄링 엔진
  3. 파이프라인과 그 컴포넌트들을 생성하는 SDK
  4. SDK와 연동하는 주피터 노트북

택시 운행 시 얼마의 팁을 주는지 예측하는 파이프라인은 아래와 같다

  1. TFT에 아파치 빔을 사용하였고, 쿠베플로우 파이프라인을 구성
  2. 텐서플로우를 사용해서 학습과 모델 생성

5–5

아래의 소스는 빔을 TFT 변환 등 일부에서 사용하였으며, 전체적인 로직은 쿠베플로우 파이프라인으로 구현되었다.

import os
import shutil
import logging
import apache_beam as beam
import tensorflow as tf
import tensorflow_transform as tft
import tensorflow_model_analysis as tfma
import tensorflow_data_validation as tfdvfrom apache_beam.io import textio
from apache_beam.io import tfrecordiofrom tensorflow_transform.beam import impl as beam_impl
from tensorflow_transform.beam.tft_beam_io import transform_fn_io
from tensorflow_transform.coders.csv_coder import CsvCoder
from tensorflow_transform.coders.example_proto_coder import ExampleProtoCoder
from tensorflow_transform.tf_metadata import dataset_metadata
from tensorflow_transform.tf_metadata import metadata_io
DATA_DIR = 'data/'
TRAIN_DATA = os.path.join(DATA_DIR, 'taxi-cab-classification/train.csv')
EVALUATION_DATA = os.path.join(DATA_DIR, 'taxi-cab-classification/eval.csv')# Categorical features are assumed to each have a maximum value in the dataset.
MAX_CATEGORICAL_FEATURE_VALUES = [24, 31, 12]
CATEGORICAL_FEATURE_KEYS = ['trip_start_hour', 'trip_start_day', 'trip_start_month']DENSE_FLOAT_FEATURE_KEYS = ['trip_miles', 'fare', 'trip_seconds']# Number of buckets used by tf.transform for encoding each feature.
FEATURE_BUCKET_COUNT = 10BUCKET_FEATURE_KEYS = ['pickup_latitude', 'pickup_longitude', 'dropoff_latitude', 'dropoff_longitude']# Number of vocabulary terms used for encoding VOCAB_FEATURES by tf.transform
VOCAB_SIZE = 1000# Count of out-of-vocab buckets in which unrecognized VOCAB_FEATURES are hashed.
OOV_SIZE = 10VOCAB_FEATURE_KEYS = ['pickup_census_tract', 'dropoff_census_tract', 'payment_type', 'company',
    'pickup_community_area', 'dropoff_community_area']# 아래 피처에 NaN 값을 허용한다
OPTIONAL_FEATURES = ['dropoff_latitude', 'dropoff_longitude', 'pickup_census_tract', 'dropoff_census_tract',
    'company', 'trip_seconds', 'dropoff_community_area']LABEL_KEY = 'tips'
FARE_KEY = 'fare'# 파라미터 학습
EPOCHS = 1
STEPS = 3
BATCH_SIZE = 32
HIDDEN_LAYER_SIZE = '1500'
LEARNING_RATE = 0.1tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.INFO)
# tf.get_logger().setLevel(logging.ERROR)# 데이터 검증
vldn_output = os.path.join(DATA_DIR, 'validation')# TODO: Understand why this was used in the conversion to the output json
# key columns: list of the names for columns that should be treated as unique keys.
key_columns = ['trip_start_timestamp']# read the first line of the cvs to have and ordered list of column names 
# (the Schema will scrable the features)
with open(TRAIN_DATA) as f:
    column_names = f.readline().strip().split(',')
# infer_schema함수는 Protocol buffer를 생성
stats = tfdv.generate_statistics_from_csv(data_location=TRAIN_DATA)
schema = tfdv.infer_schema(stats)eval_stats = tfdv.generate_statistics_from_csv(data_location=EVALUATION_DATA)
anomalies = tfdv.validate_statistics(eval_stats, schema)# 이상치를 기록
for feature_name, anomaly_info in anomalies.anomaly_info.items():
    logging.getLogger().error(
        'Anomaly in feature "{}": {}'.format(
            feature_name, anomaly_info.description))
    
# 추론 스키마를 출력
tfdv.display_schema(schema=schema)# 이상치를 해결
company = tfdv.get_feature(schema, 'company')
company.distribution_constraints.min_domain_mass = 0.9# payment_type 특성에 새로운 값을 추가
payment_type_domain = tfdv.get_domain(schema, 'payment_type')
payment_type_domain.value.append('Prcard')# 스키마를 갱신후에 평가 통계를 검증
updated_anomalies = tfdv.validate_statistics(eval_stats, schema)
tfdv.display_anomalies(updated_anomalies)# 데이터 변환
def to_dense(tensor):
    """Takes as input a SparseTensor and return a Tensor with correct default value
    Args:
      tensor: tf.SparseTensor
    Returns:
      tf.Tensor with default value
    """
    if not isinstance(tensor, tf.sparse.SparseTensor):
        return tensor
    if tensor.dtype == tf.string:
        default_value = ''
    elif tensor.dtype == tf.float32:
        default_value = 0.0
    elif tensor.dtype == tf.int32:
        default_value = 0
    else:
        raise ValueError(f"Tensor type not recognized: {tensor.dtype}")return tf.squeeze(tf.sparse_to_dense(tensor.indices,
                               [tensor.dense_shape[0], 1],
                               tensor.values, default_value=default_value), axis=1)
# TODO: Update to below version
# return tf.squeeze(tf.sparse.to_dense(tensor, default_value=default_value), axis=1)def preprocess_fn(inputs):
    """tf.transform's callback function for preprocessing inputs.
    Args:
      inputs: map from feature keys to raw not-yet-transformed features.
    Returns:
      Map from string feature key to transformed feature operations.
    """
    outputs = {}
    for key in DENSE_FLOAT_FEATURE_KEYS:
# Preserve this feature as a dense float, setting nan's to the mean.
        outputs[key] = tft.scale_to_z_score(to_dense(inputs[key]))for key in VOCAB_FEATURE_KEYS:
# Build a vocabulary for this feature.
        if inputs[key].dtype == tf.string:
            vocab_tensor = to_dense(inputs[key])
        else:
            vocab_tensor = tf.as_string(to_dense(inputs[key]))
        outputs[key] = tft.compute_and_apply_vocabulary(
            vocab_tensor, vocab_filename='vocab_' + key,
            top_k=VOCAB_SIZE, num_oov_buckets=OOV_SIZE)for key in BUCKET_FEATURE_KEYS:
        outputs[key] = tft.bucketize(to_dense(inputs[key]), FEATURE_BUCKET_COUNT)for key in CATEGORICAL_FEATURE_KEYS:
        outputs[key] = tf.cast(to_dense(inputs[key]), tf.int64)taxi_fare = to_dense(inputs[FARE_KEY])
    taxi_tip = to_dense(inputs[LABEL_KEY])
# 만약 팁이 요금의 20% 이상인지를 테스트
    tip_threshold = tf.multiply(taxi_fare, tf.constant(0.2))
    outputs[LABEL_KEY] = tf.logical_and(
        tf.logical_not(tf.math.is_nan(taxi_fare)),
        tf.greater(taxi_tip, tip_threshold))for key in outputs:
        if outputs[key].dtype == tf.bool:
            outputs[key] = tft.compute_and_apply_vocabulary(tf.as_string(outputs[key]),
                                             vocab_filename='vocab_' + key)
    
    return outputs# 아파치 빔 DirectRunner 로 실행
trns_output = os.path.join(DATA_DIR, "transformed")
if os.path.exists(trns_output):
    shutil.rmtree(trns_output)tft_input_metadata = dataset_metadata.DatasetMetadata(schema)runner = 'DirectRunner'
with beam.Pipeline(runner, options=None) as p:
    with beam_impl.Context(temp_dir=os.path.join(trns_output, 'tmp')):
        converter = CsvCoder(column_names, tft_input_metadata.schema)# 훈련 데이터를 읽음
        train_data = (
                p
                | 'ReadTrainData' >> textio.ReadFromText(TRAIN_DATA, skip_header_lines=1)
                | 'DecodeTrainData' >> beam.Map(converter.decode))# 훈련 데이터를 변환 (and get transform_fn function)
        transformed_dataset, transform_fn = (
                (train_data, tft_input_metadata) | beam_impl.AnalyzeAndTransformDataset(preprocess_fn))
        transformed_data, transformed_metadata = transformed_dataset# 변환된 훈련 데이터를 저장
        _ = transformed_data | 'WriteTrainData' >> tfrecordio.WriteToTFRecord(
            os.path.join(trns_output, 'train'),
            coder=ExampleProtoCoder(transformed_metadata.schema))# 평가된 데이터를 읽음
        eval_data = (
                p
                | 'ReadEvalData' >> textio.ReadFromText(EVALUATION_DATA, skip_header_lines=1)
                | 'DecodeEvalData' >> beam.Map(converter.decode))# 평가된 데이터를 변환 (using previously created transform_fn function)
        eval_dataset = (eval_data, tft_input_metadata)
        transformed_eval_data, transformed_metadata = (
            (eval_dataset, transform_fn) | beam_impl.TransformDataset())# 평가된 데이터를 저장
        _ = transformed_eval_data | 'WriteEvalData' >> tfrecordio.WriteToTFRecord(
            os.path.join(trns_output, 'eval'),
            coder=ExampleProtoCoder(transformed_metadata.schema))# 추후 사용을 위해서 transform_fn 함수를 저장
        # TODO: check out what is the transform function (transform_fn) that came from previous step
        _ = (transform_fn | 'WriteTransformFn' >> transform_fn_io.WriteTransformFn(trns_output))# 변환된 메타데이터를 저장
        metadata_io.write_metadata(
            metadata=tft_input_metadata,
            path=os.path.join(trns_output, 'metadata'))# 훈련
def training_input_fn(transformed_output, transformed_examples, batch_size, target_name):
    """
    Args:
      transformed_output: tft.TFTransformOutput
      transformed_examples: Base filename of examples
      batch_size: Batch size.
      target_name: name of the target column.
    Returns:
      The input function for training or eval.
    """
    dataset = tf.data.experimental.make_batched_features_dataset(
        file_pattern=transformed_examples,
        batch_size=batch_size,
        features=transformed_output.transformed_feature_spec(),
        reader=tf.data.TFRecordDataset,
        shuffle=True)
    transformed_features = dataset.make_one_shot_iterator().get_next()
    transformed_labels = transformed_features.pop(target_name)
    return transformed_features, transformed_labelsdef get_feature_columns():
    """Callback that returns a list of feature columns for building a tf.estimator.
    Returns:
      A list of tf.feature_column.
    """
    return (
            [tf.feature_column.numeric_column(key, shape=()) for key in DENSE_FLOAT_FEATURE_KEYS] +
            [tf.feature_column.indicator_column(tf.feature_column.categorical_column_with_identity(key, num_buckets=VOCAB_SIZE + OOV_SIZE)) for key in VOCAB_FEATURE_KEYS] +
            [tf.feature_column.indicator_column(tf.feature_column.categorical_column_with_identity(key, num_buckets=FEATURE_BUCKET_COUNT, default_value=0)) for key in BUCKET_FEATURE_KEYS] +
            [tf.feature_column.indicator_column(tf.feature_column.categorical_column_with_identity(key, num_buckets=num_buckets, default_value=0)) for key, num_buckets in zip(CATEGORICAL_FEATURE_KEYS, MAX_CATEGORICAL_FEATURE_VALUES)]
    )training_output = os.path.join(DATA_DIR, "training")
if os.path.exists(training_output):
    shutil.rmtree(training_output)hidden_layer_size = [int(x.strip()) for x in HIDDEN_LAYER_SIZE.split(',')]tf_transform_output = tft.TFTransformOutput(trns_output)# Set how often to run checkpointing in terms of steps.
config = tf.estimator.RunConfig(save_checkpoints_steps=1000)
n_classes = tf_transform_output.vocabulary_size_by_name("vocab_" + LABEL_KEY)
# Estimator 생성
estimator =  tf.estimator.DNNClassifier(
                feature_columns=get_feature_columns(),
                hidden_units=hidden_layer_size,
                n_classes=n_classes,
                config=config,
                model_dir=training_output)# TODO: Simplify all this: https://www.tensorflow.org/guide/premade_estimatorsestimator.train(input_fn=lambda: training_input_fn(
                                    tf_transform_output, 
                                    os.path.join(trns_output, 'train' + '*'),
                                    BATCH_SIZE, 
                                    "tips"), 
                steps=STEPS)# 모델 평가
eval_result = estimator.evaluate(input_fn=lambda: training_input_fn(
                                                    tf_transform_output, 
                                                    os.path.join(trns_output, 'eval' + '*'),
                                                    BATCH_SIZE, 
                                                    "tips"), 
                                 steps=50)print(eval_result)# 모델 분석
# TODO: Implement model load and params analysisdef eval_input_receiver_fn(transformed_output):
    """Build everything needed for the tf-model-analysis to run the model.
    Args:
      transformed_output: tft.TFTransformOutput
    Returns:
      EvalInputReceiver function, which contains:
        - Tensorflow graph which parses raw untranformed features, applies the
          tf-transform preprocessing operators.
        - Set of raw, untransformed features.
        - Label against which predictions will be compared.
    """
    serialized_tf_example = tf.compat.v1.placeholder(
        dtype=tf.string, shape=[None], name='input_example_tensor')
    features = tf.io.parse_example(serialized_tf_example, transformed_output.raw_feature_spec())
    transformed_features = transformed_output.transform_raw_features(features)
    receiver_tensors = {'examples': serialized_tf_example}
    return tfma.export.EvalInputReceiver(
        features=transformed_features,
        receiver_tensors=receiver_tensors,
        labels=transformed_features[LABEL_KEY])# 모델 반출
eval_model_dir = os.path.join(training_output, 'tfma_eval_model_dir')
tfma.export.export_eval_savedmodel(
    estimator=estimator,
    export_dir_base=eval_model_dir,
    eval_input_receiver_fn=(lambda: eval_input_receiver_fn(tf_transform_output)))

쿠베플로우 노트북에서 택시 요금 계산 파이프라인을 Kale로 전개 시 아래의 화면을 볼 수 있다.

아래는 파이프라인을 실행한 결과이다

파이프라인은 워크플로우의 컴포넌트들과 머신러닝 워크플로우의 한 형식이라고 설명할 수 있다. 또한 파이프라인을 실행하기 위한 입 출력에 대한 정의도 포함된다.

 

파이프라인이 실행되면 시스템은 각 단계에 맞는 파드를 실행한다. 그 파드는 설정된 컨테이너를 실행시키고, 컨테이너안에 있는 어플리케이션을 실행한다. 스케줄러에 따라서 순서대로 컨테이너들이 실행된다.

 

전개

쿠베플로우 Kale는 노트북에서 개발된 파이프라인을 손쉽게 패키징하고, 스냅샷을 단계 별로 생성해 주며, 파이프라인 배포파일을 만들고 전개할 수 있는 기능을 제공한다. 개발자가 직접 로직으로 구현하는 번거로움없이 Kale를 사용해서 손쉽게 실행 및 파이프라인 전개를 자동화할 수 있다.

KFServing

쿠베플로우는 실시간 서빙 및 추론을 가능하게 한다. ML 프레임워크를 위한 추상화 인터페이스를 제공하기 때문에 다양한 머신러닝 프레임워크를 운영환경에서도 쉽게 사용할 수 있다. KFServing의 Endpoint, InferenceService의 스펙정의만으로 손쉽게 추론 서비스를 구축할 수 있다. 참고로 KFServing은 이스티오 등이 필요하고 복잡하기 때문에, 쉽고 러닝커브가 낮은 BentoML을 추천한다.

AutoML을 구현함에 있어서 자동화가 특히 필요한 부분은 하이퍼파라미터튜닝과 서빙이다.

  1. Katib는 하이퍼파라미터 선택을 검색 알고리즘에 따라 자동 수행한다. 설정한 최적값이 나올 때까지 반복한다 튜닝 작업은 반복적이고 시간이 많이 소요되기 때문에 이 과정을 자동화해야 한다.
  2. Serving은 모델의 재배치 및 성능측정 모니터링 등과 함께 자동화가 필요한 부분이다. 기본적인 머신러닝파이프라인 절차는 서빙으로 종료되지 않으며 서빙 이후에도 다양한 측정과 검증 등이 반복적으로 수행된다.

AutoML, MLOps, 머신러닝 파이프라인 등은 프로세스 자동화의 측면을 다루고 있다.

 

머신러닝 운영과 통합 거버넌스
이번에는 관리의 측면에서 언급한다. 다양한 입력, 출력 데이터셋이 생성되고 머신러닝파이프라인에 의해 새로운 모델이 생성되고 재배치되는 상황을 고려해 보자.

 

소수 모델이 아니고, 다수 모델을 관리해야 되는 복잡한 상황을 고려하면 이러한 데이터세트, 메타데이터, 모델을 어떻게 효율적으로 버전을 관리하고, 서로 간에 결과를 비교하고, 상호 영향도를 분석해야 한다. 결국 거버넌스의 필요성이다.

 

다양한 파이프라인에서 생성되는 결과물과 흐름을 통합적으로 관리해 주는 툴이 필요하다. 메타와 리니지 툴은 이러한 요구사항에 부합되어야 한다. 쿠베플로우는 모델 및 데이터세트 등에 대한 메타와 리니지를 관리할 수 있는 기능을 제공해 주고 있다. 머신러닝 파이프라인 내에서 생성되는 메타와 리니지 정보를 전사 거버넌스와 통합하고 통합 관리할 있다면 더욱 좋을 것이다. 즉 데이터파이프라인 빌드파이프라인 머신러닝파이프라인의 메타데이터와 데이터리니지를 통합적으로 관리할 수 있는 전사 거버넌스가 필요하다.

 

참고

디지털북스 쿠브플로우 이명환 지음

비제이퍼블릭 구글 클라우드 플랫폼 뽀개기 박정운 지음