閱讀934 返回首頁    go iPhone_iPad_Mac_手機_平板_蘋果apple


Python SDK實例程序__最佳實踐_歸檔存儲-阿裏雲

Python SDK 簡易使用示例

本節以示例的方式展示如何使用SDK高級接口進行開發。用戶在閱讀本節後,可模仿示例,並參考高級接口一節進行開發。

其中,方括號內的參數用戶應根據實際需求進行替換。

  • 創建 Vault
from oas.oas_api import OASAPI
from oas.ease.vault import Vault

# 創建 OASAPI 對象
api = OASAPI('[Server Host]', '[Access Key ID]', '[Access Key Secret]')

# 創建 Vault
vault = Vault.create_vault(api, '[Vault Name]')
  • 查找Vault
# 創建 OASAPI 對象
api = OASAPI('[Server Host]', '[Access Key ID]', '[Access Key Secret]')

# 根據名稱獲取 Vault
vault = Vault.get_vault_by_name(api, '[Vault Name]')

# 根據 ID 獲取 Vault
vault = Vault.get_vault_by_id(api, '[Vault ID]')
  • 上傳文件
archive_id = vault.upload_archive('[File Path]')
  • 刪除 Archive
vault.delete_archive('[Archive ID]')
  • 續傳 Multipart Upload 任務
uploader = vault.recover_uploader('[Upload ID]')
uploader.resume('[File Path]')
  • 獲取 Archive 列表
job = vault.retrieve_inventory()
job.download_to_file('[File Path]')
  • 下載 Archive
job = vault.retrieve_archive('[Archive ID]')
job.download_to_file('[File Path]')
  • 從OSS上轉儲Object到OAS
job = vault.pull_from_oss(conf.osshost, conf.bucket, conf.object, "test desc")
  • 從OAS上轉儲Archive到OSS
job = vault.push_to_oss(archive_id, conf.osshost, conf.bucket, archive_id, "test desc")

Python SDK 完整使用演示代碼

下麵函數中test_single_archive_upload提供單一文件archive上傳;

函數test_multi_upload() 使用sdk低級接口實現分段上傳;

函數test_uploader()使用sdk高級接口實數據上傳(當數據大於64MB時,會自動選擇分段上傳);

函數test_vault_retrieve()實現vault信息查詢;

函數test_download_archive(archive_id)實現archive下載;

函數test_delete_archive(archive_id)實現archive刪除。

函數test_pull_from_oss() 實現從OSS直接轉儲到OAS

函數test_push_to_oss() 實現從OAS直接轉儲到OSS

import random
import time 
import logging

import logging.handlers  
from oas.oas_api import OASAPI
from oas.ease.api import APIProxy
from oas.ease.exceptions import *
from oas.ease.response import *
from oas.ease.utils import *
from oas.ease.vault import *
from oas.ease.uploader import *
from oas.ease.job import *

import os

LOG_FILE="test.log"
handler = logging.handlers.RotatingFileHandler(LOG_FILE, maxBytes = 1024*1024, backupCount = 5)
fmt = '%(asctime)s - %(filename)s:%(lineno)s - %(name)s - %(message)s'  
formatter = logging.Formatter(fmt)
handler.setFormatter(formatter)
log.addHandler(handler)
log.setLevel(logging.DEBUG)


class TestConf(object):

    def __init__(self):
        self.host = 'oas域名'
        self.accesskey_id = "您的access key Id"
        self.accesskey_secret = "您的access key secret"
        self.vault_name = "normal"
        self.vault_name_test = "test"

        self.osshost = "您要轉儲的oss域名"
        self.bucket = "您要轉儲的bucket"
        self.object = "您要轉儲的Object"

conf = TestConf()


class TestDemo():
    _MEGABYTE = 1024 * 1024


    def __init__(self):
        self.api = OASAPI(conf.host, conf.accesskey_id, conf.accesskey_secret)
        self.api_proxy = APIProxy(self.api)
        self.vault_name = conf.vault_name
        self.vault_name_test = conf.vault_name_test

        self.file_big = "random100M.bin"
        self.file_big_size = 200*self._MEGABYTE

        with open(self.file_big, 'wb+') as f:
            self.write_random_data(f, self.file_big_size)

        self.file_small = "random30M.bin"
        self.file_small_size = 30*self._MEGABYTE

        with open(self.file_small, 'wb+') as f:
            self.write_random_data(f, self.file_small_size)


    def write_random_data(self, file, size):
        remaining = size
        while remaining > 0:
            n = min(remaining, 1024)
            random_data = os.urandom(n)
            file.write(random_data)
            remaining = remaining - n

            n = min(remaining, 1024 * random.randint(256, 1024))
            file.write('' * n)
            remaining = remaining - n


    def test_single_archive_upload(self):
        print '''
########################################################
 Using the low level api method to upload a small archive 
#########################################################'''
        res = self.api_proxy.create_vault(self.vault_name)
        vault_id = res['x-oas-vault-id']

        etag = compute_etag_from_file(self.file_small)
        tree_etag = compute_tree_etag_from_file(self.file_small)
        with open(self.file_small, 'rb') as f:
            content = f.read()
            res = self.api_proxy.post_archive(vault_id, content, etag, tree_etag)
            archive_id = res['x-oas-archive-id']
        print "archive_id",archive_id

    #test the multipart upload by the proxy api, and this is low level api
    def test_multi_upload(self):
        print '''nnn
########################################################
 Using the low level api to invoke the multipart api and implement the archive uplod
#########################################################'''
        res = self.api_proxy.create_vault(self.vault_name)
        vault_id = res['x-oas-vault-id']

        part_size = 1024 * 1024 * 64


        etag_array= []
        tree_etag_array= []
        offset = 0

        cur_part_num=0
        cur_start_pos = 0
        cur_end_pos = 0
        print "1. comput the etag,tree-etag"
        print "1.1 comput the  etag , tree_etag of part" 
        while True:
            tmpsize = part_size
            if(cur_start_pos + tmpsize > self.file_big_size):
                tmpsize = self.file_big_size - cur_start_pos;
            cur_end_pos += tmpsize -1

            etag_array.append(compute_etag_from_file(self.file_big, cur_start_pos, tmpsize) )
            tree_etag_array.append(compute_tree_etag_from_file(self.file_big, cur_start_pos, tmpsize))

            cur_start_pos += tmpsize
            cur_part_num += 1
            if(cur_start_pos >= self.file_big_size-1):
                break;

        print "1.2 comput the  total tree_etag of the archive" 
        tree_etag_total = compute_combine_tree_etag_from_list( tree_etag_array) ;

        print "2.1 init the upload task, and get the uploadId"
        res = self.api_proxy.create_multipart_upload(vault_id, part_size)
        upload_id = res['x-oas-multipart-upload-id']
        print "upload_id",upload_id

        f = open(self.file_big, 'rb')

        cur_part_num = 0
        cur_start_pos = 0
        cur_end_pos = 0
        while True:
            tmpsize = part_size
            if(cur_start_pos + tmpsize > self.file_big_size):
                tmpsize = self.file_big_size - cur_start_pos;

            cur_end_pos += tmpsize
            print "2.2 upload every part to oas server, and the etag of the part will be used. current part is:", cur_part_num+1
            self.api_proxy.post_multipart_from_reader(vault_id, upload_id, f, tmpsize,('%d-%d' %(cur_start_pos, cur_end_pos-1)), etag_array[cur_part_num],tree_etag_array[cur_part_num])

            cur_start_pos += tmpsize
            cur_part_num += 1
            if(cur_end_pos>= self.file_big_size -1):
                break;

        print "2.3 complete the multipart task, and the total etag will be used"
        res = self.api_proxy.complete_multipart_upload(
                        vault_id, upload_id, self.file_big_size, tree_etag_total)
        print "output the archive id"
        archive_id = res['x-oas-archive-id']
        print "archive_id",archive_id
        return archive_id

    #test the uploader to upload big file, and this is high level api
    def test_uploader(self):
        print '''nnn
########################################################
 Using the High level api to invoke the multipart api 
#########################################################'''
        vault = Vault.create_vault(self.api, self.vault_name)

        print "initial a uploadId"
        uploader = vault.initiate_uploader(self.file_big)
        uploader_id = uploader.id
        print "uploader_id",uploader_id

        print "start the multipart" 
        archive_id = uploader.start()

        print "finish the upload, and output the archive_id"
        print "archive_id",archive_id
        return archive_id


    #to inquire the archive list of vault
    def test_vault_retrieve(self):
        print '''nnn
########################################################
 Retrieve the vault info, and inquire the archive list of the vault 
#########################################################'''
        vault = Vault.create_vault(self.api, self.vault_name)
        job = vault.retrieve_inventory()
        job.download_to_file(job.id)

    #to test download archive
    def test_download_archive(self, archive_id):
        print '''nnn
########################################################
 Submit the archive job and download the job to local 
#########################################################'''
        vault = Vault.create_vault(self.api, self.vault_name)
        job = vault.retrieve_archive(archive_id)
        job.update_status()
        if not job.completed:
            job.download_to_file(file_path = job.id, block = True)

    #test delete archive
    def test_delete_archive(self, archive_id):
        print '''nnn
########################################################
 Delete the archive 
#########################################################'''
        vault = Vault.create_vault(self.api, self.vault_name)
        vault.delete_archive(archive_id)

    def test_pull_from_oss(self):
        print '''nnn
########################################################
 submit a pull-from-oss job and OAS will finish the pull of OSS object to OAS 
#########################################################'''
        vault = Vault.create_vault(self.api, self.vault_name)
        # create vault
        job = vault.pull_from_oss(conf.osshost, conf.bucket, conf.object, "test desc")
        job._check_status(block=True)

        # delete archive
        print "bucket:%s object:%s finish pull from oss,n archiveId:%s" % (conf.bucket, conf.object, job.archive_id)


    def test_push_to_oss(self):
        print '''nnn
########################################################
 submit a push-to-oss job and OAS will finish the push of OAS archive to OSS 
#########################################################'''
        vault = Vault.create_vault(self.api, self.vault_name)
        archive_id = vault.upload_archive(self.file_big)
        job = vault.push_to_oss(archive_id, conf.osshost, conf.bucket, archive_id, "test desc")
        job._check_status(block=True)
        print archive_id + " finish push to oss"        

if __name__ == '__main__':
    t = TestDemo()

    t.test_single_archive_upload()
    t.test_multi_upload();

    archive_id = t.test_uploader();

    t.test_vault_retrieve();
    t.test_download_archive(archive_id)
    t.test_delete_archive(archive_id)

    t.test_pull_from_oss()
    t.test_push_to_oss()

最後更新:2016-11-23 17:16:04

  上一篇:go Java SDK下載__SDK下載_SDK使用手冊_歸檔存儲-阿裏雲
  下一篇:go Java SDK實例程序__最佳實踐_歸檔存儲-阿裏雲