13
01/2016
Azure Blob(二)——块Blob上传、下载、删除(未完结)
这是Azure Blob系列文章。此前文章的列表为:
这一节,我们主要学习块Blob的相关操作。块Blob上传时,需要先把文件流划分成不大于4MB的块,待所有文件块都上传完毕之后,再调用PutBlockList方法确认,此前的块便会组织成一个Blob。若不提交,则上传的blob块一周之后自动删除。
下列的代码是上传BlockBlob的相关操作,其中包括多线程上传和单线程上传。文件上传取决于带宽,因此,单线程上传和多线程上传的效率差不多。对于小文件,单线程上传甚至比多线程上传更快。对于大文件,多线程上传比单线程上传略快一些,主要是因为多线程缓存了上传文件。
未完待续。。。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.WindowsAzure;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Auth;
using Microsoft.WindowsAzure.Storage.Blob;
using System.IO;
namespace AzureBaseLib
{
public class BlockBlobClient : IBlobClient
{
private readonly long THRESHOLD = 100 * 1024 * 1024; //单线程小文件上传的上限
CloudBlobClient _blobClient = null;
public BlockBlobClient(string connectionString)
{
_blobClient = CloudStorageAccount.Parse(connectionString).CreateCloudBlobClient();
}
/***************************************
* 单线程上传
**************************************/
#region 单线程上传
/// <summary>
/// 从数据流中单线程上传,大文件也能上传
/// </summary>
/// <param name="containerName"></param>
/// <param name="blobName"></param>
/// <param name="stream"></param>
public void UploadFromStream(string containerName, string blobName, Stream stream)
{
var blockBlob = getBlockBlob(containerName, blobName);
blockBlob.UploadFromStream(stream);
}
/// <summary>
/// 从数据流中异步上传,大文件也能上传
/// </summary>
/// <param name="containerName"></param>
/// <param name="blobName"></param>
/// <param name="stream"></param>
/// <returns></returns>
public Task UploadFromStreamAsync(string containerName, string blobName, Stream stream)
{
var blockBlob = getBlockBlob(containerName, blobName);
return blockBlob.UploadFromStreamAsync(stream);
}
/// <summary>
/// 指定文件路径,上传小文件
/// </summary>
/// <param name="containerName"></param>
/// <param name="blobName"></param>
/// <param name="fileName"></param>
public void UploadFromFile(string containerName, string blobName, string fileName)
{
var blockBlob = getBlockBlob(containerName, blobName);
blockBlob.UploadFromFile(fileName, FileMode.Open);
}
/// <summary>
/// 指定文件路径,异步上传小文件
/// </summary>
/// <param name="containerName"></param>
/// <param name="blobName"></param>
/// <param name="fileName"></param>
/// <returns></returns>
public Task UploadFromFileAsync(string containerName, string blobName, string fileName)
{
var blockBlob = getBlockBlob(containerName, blobName);
return blockBlob.UploadFromFileAsync(fileName, FileMode.Open);
}
/// <summary>
/// 上传字符串
/// </summary>
/// <param name="containerName"></param>
/// <param name="blobName"></param>
/// <param name="str"></param>
public void UploadFromString(string containerName, string blobName, string str)
{
var blockBlob = getBlockBlob(containerName, blobName);
blockBlob.UploadText(str);
}
/// <summary>
/// 异步上传字符串
/// </summary>
/// <param name="containerName"></param>
/// <param name="blobName"></param>
/// <param name="str"></param>
/// <returns></returns>
public Task UploadFromStringAsync(string containerName, string blobName, string str)
{
var blockBlob = getBlockBlob(containerName, blobName);
return blockBlob.UploadTextAsync(str);
}
#endregion
/***************************************
* 多线程上传
**************************************/
/// <summary>
/// 自动判断上传的文件方式
/// </summary>
/// <param name="containerName"></param>
/// <param name="blobName"></param>
/// <param name="stream"></param>
public void Upload(string containerName, string blobName, Stream stream)
{
if(stream.Length > THRESHOLD)
{
UploadMultiTask(containerName, blobName, stream);
}
else
{
UploadFromStream(containerName, blobName, stream);
}
}
/// <summary>
/// 上传大文件
/// </summary>
/// <param name="containerName"></param>
/// <param name="blobName"></param>
/// <param name="stream"></param>
/// <param name="blockSize"></param>
/// <param name="thread"></param>
/// <returns></returns>
public void UploadMultiTask(string containerName, string blobName, Stream stream, long? blockSize = null, int? thread = null)
{
var blob = getBlockBlob(containerName, blobName);
BlockBlobUploadWorker worker = new BlockBlobUploadWorker(blob, stream, thread, blockSize);
worker.Run();
}
#region
/***************************************
* 验证是否存在
**************************************/
#region 验证是否存在
public bool CheckExist(string containerName, string blobName)
{
// Retrieve a reference to a container.
CloudBlobContainer container = _blobClient.GetContainerReference(containerName);
if (!container.Exists())
{
return false;
}
return container.GetBlockBlobReference(blobName).Exists();
}
public Task<bool> CheckExistAsync(string containerName, string blobName)
{
// Retrieve a reference to a container.
CloudBlobContainer container = _blobClient.GetContainerReference(containerName);
Task<bool> containerExist = container.ExistsAsync();
if (!containerExist.Result)
{
return Task.Factory.StartNew<bool>(()=> { return false; });
}
return container.GetBlockBlobReference(blobName).ExistsAsync();
}
#endregion
/***************************************
* 删除Container
**************************************/
#region 删除Container
public void DeleteContainer(string containerName)
{
CloudBlobContainer container = _blobClient.GetContainerReference(containerName);
container.DeleteIfExists();
}
public Task DeleteContainerAsync(string containerName)
{
CloudBlobContainer container = _blobClient.GetContainerReference(containerName);
return container.DeleteIfExistsAsync();
}
#endregion
/***************************************
* 删除Blob
**************************************/
#region 删除blob
public void Delete(string containerName, string blobName)
{
CloudBlobContainer container = _blobClient.GetContainerReference(containerName);
if (!container.Exists())
{
return;
}
CloudBlockBlob blockBlob = container.GetBlockBlobReference(blobName);
blockBlob.Delete();
}
public Task DeleteAsync(string containerName, string blobName)
{
CloudBlobContainer container = _blobClient.GetContainerReference(containerName);
Task<bool> containerExist = container.ExistsAsync();
if (!containerExist.Result)
{
return Task.Factory.StartNew(() => { });
}
CloudBlockBlob blockBlob = container.GetBlockBlobReference(blobName);
return blockBlob.DeleteAsync();
}
#endregion
/***************************************
* 小文件下载
**************************************/
#region 下载Blob
public Stream DownloadToStream(string containerName, string blobName)
{
var blockBlob = getBlockBlob(containerName, blobName);
MemoryStream memStream = new MemoryStream();
blockBlob.DownloadToStream(memStream);
memStream.Position = 0;
return memStream;
}
public Task<Stream> DownloadToStreamAsync(string containerName, string blobName)
{
var blockBlob = getBlockBlob(containerName, blobName);
MemoryStream memStream = new MemoryStream();
Task t = blockBlob.DownloadToStreamAsync(memStream);
return Task.Factory.StartNew<Stream>(()=> {
t.Wait();
memStream.Position = 0;
return memStream;
});
}
public string DownLoadToString(string containerName, string blobName)
{
var blockBlob = getBlockBlob(containerName, blobName);
return blockBlob.DownloadText();
}
public Task<string> DownLoadToStringAsync(string containerName, string blobName)
{
var blockBlob = getBlockBlob(containerName, blobName);
return blockBlob.DownloadTextAsync();
}
#endregion
#region own function
private CloudBlockBlob getBlockBlob(string containerName, string blobName)
{
// Retrieve a reference to a container.
CloudBlobContainer container = _blobClient.GetContainerReference(containerName);
// Create the container if it doesn't already exist.
container.CreateIfNotExists();
CloudBlockBlob blockBlob = container.GetBlockBlobReference(blobName);
return blockBlob;
}
#endregion
#endregion
}
class BlockBlobUploadWorker
{
private readonly long DEFAULT_BLOCK_SIZE = 2 * 1024 * 1024; //默认每块的大小
private readonly int DEFAULT_THREAD = 20; //默认上传和下载的线程数
private readonly long MAX_BLOCK_SIZE = 4 * 1024 * 1024; //每个块最大的大小
private readonly int TRY_NUMBER = 5; //单个块上传失败的次数
private object _syncRoot = new object(); //for multi task sync
private int _blockId = 0;
List<string> _blockIdList = new List<string>();
private int _runningTaskNumber = 0;
private object _syncRunningNumebr = new object();
private bool isCancel = false;
private object _syncCancel = new object();
private long _byteCountLeft; // left byte count
private Stream _stream;
private int _totalThread;
private long _blockSize;
private CloudBlockBlob _blob;
AutoResetEvent _sigEvent = new AutoResetEvent(false);
public BlockBlobUploadWorker(CloudBlockBlob blob, Stream stream, int? totalThread, long? blockSize)
{
_blob = blob;
_stream = stream;
_byteCountLeft = stream.Length;
if (blockSize == null || blockSize > MAX_BLOCK_SIZE)
{
blockSize = DEFAULT_BLOCK_SIZE;
}
if (totalThread == null || totalThread < 0)
{
totalThread = DEFAULT_THREAD;
}
_totalThread = (int)totalThread;
_blockSize = (long)blockSize;
}
public void Run()
{
//first we queue the task
List<Func<Task>> taskList = new List<Func<Task>>();
while(taskList.Count < _totalThread)
{
var task = GetTask();
if (task == null)
{
break;
}
taskList.Add(task);
}
//let's run it
foreach(var task in taskList)
{
runTask(task);
}
WaitForFinished();
PutBlockList();
}
private void runTask(Func<Task> func)
{
lock (_syncCancel)
{
if (isCancel)
{
return;
}
}
lock (_syncRunningNumebr)
{
_runningTaskNumber++;
}
var task = func();
task.ContinueWith((T) => {
lock (_syncRunningNumebr)
{
_runningTaskNumber--;
}
var t = GetTask();
if (t != null)
{
runTask(t);
}
SignifyIfOK();
});
}
private void PutBlockList()
{
int tryNumber = 0;
while (true)
{
try
{
_blob.PutBlockList(_blockIdList);
break;
}
catch
{
tryNumber++;
if (tryNumber > TRY_NUMBER)
{
throw;
}
}
}
}
private void WaitForFinished()
{
_sigEvent.WaitOne();
}
private void SignifyIfOK()
{
lock (_syncRunningNumebr)
{
if (_runningTaskNumber == 0)
{
_sigEvent.Set();
}
}
}
private Func<Task> GetTask()
{
lock (_syncRoot)
{
if (_byteCountLeft > 0)
{
long bufferSize = Math.Min(_byteCountLeft, _blockSize);
_byteCountLeft -= bufferSize;
byte[] buffer = new byte[bufferSize];
Task t = _stream.ReadAsync(buffer, 0, (int)bufferSize); //get buffer
string blockIdBase64 = Convert.ToBase64String(BitConverter.GetBytes(_blockId));
Func<Task> fun = async () =>
{
await PutBlockAsync(blockIdBase64, buffer);
};
_blockIdList.Add(blockIdBase64);
_blockId++;
t.Wait();
return fun;
}
else
{
return null;
}
}
}
private async Task PutBlockAsync(string blockId, byte[] buffer)
{
using (MemoryStream stream = new MemoryStream(buffer))
{
int tryNumber = 0;
while(true)
{
try
{
await _blob.PutBlockAsync(blockId, stream, null);
break;
}
catch
{
tryNumber++;
if (tryNumber > TRY_NUMBER)
{
lock (_syncCancel)
{
isCancel = true;
}
throw;
}
}
}
}
}
}
}单线程的下载测试代码:
BlockBlobClient blobClient = new BlockBlobClient("你的连接字符串");
using (Stream stream = blobClient.DownloadToStream("test1", "测试.rar"))
{
byte[] buffer = new byte[stream.Length];
stream.Read(buffer, 0, (int)stream.Length);
using (FileStream fs = new FileStream(@"d:\data\测试.rar", FileMode.OpenOrCreate, FileAccess.Write))
{
fs.Write(buffer, 0, buffer.Length);
}
}
Console.WriteLine("Finished!");
Console.ReadKey();多线程上传测试代码:
static void Main(string[] args)
{
BlockBlobClient blobClient = new BlockBlobClient("你的连接字符串");
using(FileStream stream = new FileStream(@"D:\Data\FJData.rar", FileMode.Open, FileAccess.Read))
{
blobClient.UploadBigFile("test1", "测试1.rar", stream);
}
Console.WriteLine("Finished!");
Console.ReadKey();
}
0 条评论