本文章利用 Python 实现一个简单的功能较为完善的区块链系统(包括区块链结构、账户、钱包、转账),采用的共识机制是 POW。 一、区块与区块链结构Block.py import hashlib from datetime import datetime class Block: """ 区块链结构: prev_hash: 父区块哈希值 data: 区块内容 timestamp: 区块创建时间 hash: 区块哈希值 """ def __init__(self, data, prev_hash): # 将传入的父区块哈希值和数据保存到变量中 self.prev_hash = prev_hash self.data = data # 获得当前的时间 self.timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") # 计算区块哈希值 # 获取哈希对象 message = hashlib.sha256() # 先将数据内容转为字符串并进行编码,再将它们哈希 # 注意:update() 方法现在只接受 bytes 类型的数据,不接收 str 类型 message.update(str(self.prev_hash).encode('utf-8')) message.update(str(self.prev_hash).encode('utf-8')) message.update(str(self.prev_hash).encode('utf-8')) # update() 更新 hash 对象,连续的调用该方法相当于连续的追加更新 # 返回字符串类型的消息摘要 self.hash = message.hexdigest()BlockChain.py from Block import Block class BlockChain: """ 区块链结构体 blocks: 包含区块的列表 """ def __init__(self): self.blocks = [] def add_block(self, block): """ 添加区块 :param block: :return: """ self.blocks.append(block) # 新建区块 genesis_block = Block(data="创世区块", prev_hash="") new_block1 = Block(data="张三转给李四一个比特币", prev_hash=genesis_block.hash) new_block2 = Block(data="张三转给王五三个比特币", prev_hash=genesis_block.hash) # 新建一个区块链对象 blockChain = BlockChain() # 将刚才新建的区块加入区块链 blockChain.add_block(genesis_block) blockChain.add_block(new_block1) blockChain.add_block(new_block2) # 打印区块链信息 print("区块链包含区块个数为:%d\n" % len(blockChain.blocks)) blockHeight = 0 for block in blockChain.blocks: print(f"本区块高度为:{blockHeight}") print(f"父区块哈希:{block.prev_hash}") print(f"区块内容:{block.data}") print(f"区块哈希:{block.hash}") print() blockHeight += 1测试结果 二、加入工作量证明(POW)将工作量证明加入到 Block.py 中 import hashlib from datetime import datetime from time import time class Block: """ 区块链结构: prev_hash: 父区块哈希值 data: 区块内容 timestamp: 区块创建时间 hash: 区块哈希值 nonce: 随机数 """ def __init__(self, data, prev_hash): # 将传入的父区块哈希值和数据保存到变量中 self.prev_hash = prev_hash self.data = data # 获得当前的时间 self.timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") # 设置随机数、哈希初始值为 None self.nonce = None self.hash = None # 类的 __repr__() 方法定义了实例化对象的输出信息 def __repr__(self): return f"区块内容:{self.data}\n区块哈希值:{self.hash}" class ProofOfWork: """ 工作量证明: block: 区块 difficulty: 难度值 """ def __init__(self, block, difficult=5): self.block = block # 定义出块难度,默认为 5,表示有效哈希值以 5 个零开头 self.difficulty = difficult def mine(self): """ 挖矿函数 :return: """ i = 0 prefix = '0' * self.difficulty while True: message = hashlib.sha256() message.update(str(self.block.prev_hash).encode('utf-8')) message.update(str(self.block.data).encode('utf-8')) message.update(str(self.block.timestamp).encode('utf-8')) message.update(str(i).encode('utf-8')) # digest() 返回摘要,作为二进制数据字符串值 # hexdigest() 返回摘要,作为十六进制数据字符串值 digest = message.hexdigest() # str.startswith(prefix) 检测字符串是否是以 prefix(字符串)开头,返回布尔值 if digest.startswith(prefix): # 幸运数字 self.block.nonce = i # 区块哈希值为十六进制数据字符串摘要 self.block.hash = digest return self.block i += 1 def validate(self): """ 验证有效性 :return: """ message = hashlib.sha256() message.update(str(self.block.prev_hash).encode('utf-8')) message.update(str(self.block.data).encode('utf-8')) message.update(str(self.block.timestamp).encode('utf-8')) message.update(str(self.block.nonce).encode('utf-8')) digest = message.hexdigest() prefix = '0' * self.difficulty return digest.startswith(prefix) # ++++++++测试++++++++ # 定义一个区块 b = Block(data="测试", prev_hash="") # 定义一个工作量证明 w = ProofOfWork(b) # 开始时间 start_time = time() # 挖矿,并统计函数执行时间 print("+++开始挖矿+++") valid_block = w.mine() # 结束时间 end_time = time() print(f"挖矿花费时间:{end_time - start_time}秒") # 验证区块 print(f"区块哈希值是否符合规则:{w.validate()}") print(f"区块哈希值为:{b.hash}")测试结果 更新 BlockChain.py from Block import Block, ProofOfWork class BlockChain: """ 区块链结构体 blocks: 包含区块的列表 """ def __init__(self): self.blocks = [] def add_block(self, block): """ 添加区块 :param block: :return: """ self.blocks.append(block) # 新建一个区块链对象 blockChain = BlockChain() # 新建区块 block1 = Block(data="创世区块", prev_hash="") w1 = ProofOfWork(block1) genesis_block = w1.mine() blockChain.add_block(genesis_block) block2 = Block(data="张三转给李四一个比特币", prev_hash=genesis_block.hash) w2 = ProofOfWork(block2) block = w2.mine() blockChain.add_block(block) block3 = Block(data="张三转给王五三个比特币", prev_hash=block.hash) w3 = ProofOfWork(block3) block = w3.mine() blockChain.add_block(block) # 打印区块链信息 print("区块链包含区块个数为:%d\n" % len(blockChain.blocks)) blockHeight = 0 for block in blockChain.blocks: print(f"本区块高度为:{blockHeight}") print(f"父区块哈希:{block.prev_hash}") print(f"区块内容:{block.data}") print(f"区块哈希:{block.hash}") print() blockHeight += 1测试结果 三、实现钱包、账户、交易功能实现钱包、账户、交易功能要先安装非对称加密算法库 ecdsa。如果网速慢,引用下面这个网站 -i https://pypi.tuna.tsinghua.edu.cn/simple 添加钱包、账户功能 Wallet.py import base64 import binascii from hashlib import sha256 # 导入椭圆曲线算法 from ecdsa import SigningKey, SECP256k1, VerifyingKey class Wallet: """ 钱包 """ def __init__(self): """ 钱包初始化时基于椭圆曲线生成一个唯一的秘钥对,代表区块链上一个唯一的账户 """ # 生成私钥 self._private_key = SigningKey.generate(curve=SECP256k1) # 基于私钥生成公钥 self._public_key = self._private_key.get_verifying_key() @property def address(self): """ 这里通过公钥生成地址 """ h = sha256(self._public_key.to_pem()) # 地址先由公钥进行哈希算法,再进行 Base64 计算而成 return base64.b64encode(h.digest()) @property def pubkey(self): """ 返回公钥字符串 """ return self._public_key.to_pem() def sign(self, message): """ 生成数字签名 """ h = sha256(message.encode('utf8')) # 利用私钥生成签名 # 签名生成的是一串二进制字符串,为了便于查看,这里转换为 ASCII 字符串进行输出 return binascii.hexlify(self._private_key.sign(h.digest())) def verify_sign(pubkey, message, signature): """ 验证签名 """ verifier = VerifyingKey.from_pem(pubkey) h = sha256(message.encode('utf8')) return verifier.verify(binascii.unhexlify(signature), h.digest())实现转账功能 Transaction.py import json class Transaction: """ 交易的结构 """ def __init__(self, sender, recipient, amount): """ 初始化交易,设置交易的发送方、接收方和交易数量 """ # 交易发送者的公钥 self.pubkey = None # 交易的数字签名 self.signature = None if isinstance(sender, bytes): sender = sender.decode('utf-8') self.sender = sender # 发送方 if isinstance(recipient, bytes): recipient = recipient.decode('utf-8') self.recipient = recipient # 接收方 self.amount = amount # 交易数量 def set_sign(self, signature, pubkey): """ 为了便于验证这个交易的可靠性,需要发送方输入他的公钥和签名 """ self.signature = signature # 签名 self.pubkey = pubkey # 发送方公钥 def __repr__(self): """ 交易大致可分为两种,一是挖矿所得,而是转账交易 挖矿所得无发送方,以此进行区分显示不同内容 """ if self.sender: s = f"从{self.sender}转自{self.recipient}{self.amount}个加密货币" elif self.recipient: s = f"{self.recipient}挖矿所得{self.amount}个加密货币" else: s = "error" return s class TransactionEncoder(json.JSONEncoder): """ 定义Json的编码类,用来序列化Transaction """ def default(self, obj): if isinstance(obj, Transaction): return obj.__dict__ else: return json.JSONEncoder.default(self, obj) # return super(TransactionEncoder, self).default(obj)更新 Block.py import hashlib import json from datetime import datetime from Transaction import Transaction, TransactionEncoder class Block: """ 区块结构 prev_hash: 父区块哈希值 transactions: 交易对 timestamp: 区块创建时间 hash: 区块哈希值 Nonce: 随机数 """ def __init__(self, transactions, prev_hash): # 将传入的父哈希值和数据保存到类变量中 self.prev_hash = prev_hash # 交易列表 self.transactions = transactions # 获取当前时间 self.timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") # 设置Nonce和哈希的初始值为None self.nonce = None self.hash = None # 类的 __repr__() 方法定义了实例化对象的输出信息 def __repr__(self): return f"区块内容:{self.transactions}\n区块哈希值:{self.hash}" class ProofOfWork: """ 工作量证明 block: 区块 difficulty: 难度值 """ def __init__(self, block, miner, difficult=5): self.block = block self.miner = miner # 定义工作量难度,默认为5,表示有效的哈希值以5个“0”开头 self.difficulty = difficult # 添加挖矿奖励 self.reward_amount = 1 def mine(self): """ 挖矿函数 """ i = 0 prefix = '0' * self.difficulty # 设置挖矿自动生成交易信息,添加挖矿奖励 t = Transaction( sender="", recipient=self.miner.address, amount=self.reward_amount, ) sig = self.miner.sign(json.dumps(t, cls=TransactionEncoder)) t.set_sign(sig, self.miner.pubkey) self.block.transactions.append(t) while True: message = hashlib.sha256() message.update(str(self.block.prev_hash).encode('utf-8')) # 更新区块中的交易数据 # message.update(str(self.block.data).encode('utf-8')) message.update(str(self.block.transactions).encode('utf-8')) message.update(str(self.block.timestamp).encode('utf-8')) message.update(str(i).encode("utf-8")) digest = message.hexdigest() if digest.startswith(prefix): self.block.nonce = i self.block.hash = digest return self.block i += 1 def validate(self): """ 验证有效性 """ message = hashlib.sha256() message.update(str(self.block.prev_hash).encode('utf-8')) # 更新区块中的交易数据 # message.update(str(self.block.data).encode('utf-8')) message.update(json.dumps(self.block.transactions).encode('utf-8')) message.update(str(self.block.timestamp).encode('utf-8')) message.update(str(self.block.nonce).encode('utf-8')) digest = message.hexdigest() prefix = '0' * self.difficulty return digest.startswith(prefix)更新 BlockChain.py from Block import Block, ProofOfWork from Transaction import Transaction from Wallet import Wallet, verify_sign class BlockChain: """ 区块链结构体 blocks: 包含的区块列表 """ def __init__(self): self.blocks = [] def add_block(self, block): """ 添加区块 """ self.blocks.append(block) def print_list(self): print(f"区块链包含个数为:{len(self.blocks)}") for block in self.blocks: height = 0 print(f"区块链高度为:{height}") print(f"父区块为:{block.prev_hash}") print(f"区块内容为:{block.transactions}") print(f"区块哈希值为:{block.hash}") height += 1 print()为了方便我们对区块链进行操作,我们可以在 BlockChain.py 中补充一些方法 # 传入用户和区块链,返回用户的“余额” def get_balance(user, blockchain): balance = 0 for block in blockchain.blocks: for t in block.transactions: if t.sender == user.address.decode(): balance -= t.amount elif t.recipient == user.address.decode(): balance += t.amount return balance # user生成创世区块(新建区块链),并添加到区块链中 def generate_genesis_block(user): blockchain = BlockChain() new_block = Block(transactions=[], prev_hash="") w = ProofOfWork(new_block, user) genesis_block = w.mine() blockchain.add_block(genesis_block) # 返回创世区块 return blockchain # 用户之间进行交易并记入交易列表 def add_transaction(sender, recipient, amount): # 新建交易 new_transaction = Transaction( sender=sender.address, recipient=recipient.address, amount=amount ) # 生成数字签名 sig = sender.sign(str(new_transaction)) # 传入付款方的公钥和签名 new_transaction.set_sign(sig, sender.pubkey) return new_transaction # 验证交易,若验证成功则加入交易列表 def verify_new_transaction(new_transaction, transactions): if verify_sign(new_transaction.pubkey, str(new_transaction), new_transaction.signature ): # 验证交易签名没问题,加入交易列表 print("交易验证成功") transactions.append(new_transaction) else: print("交易验证失败") # 矿工将全部验证成功的交易列表打包出块 def generate_block(miner, transactions, blockchain): new_block = Block(transactions=transactions, prev_hash=blockchain.blocks[len(blockchain.blocks) - 1].hash) print("生成新的区块...") # 挖矿 w = ProofOfWork(new_block, miner) block = w.mine() print("将新区块添加到区块链中") blockchain.add_block(block)进行测试 # 新建交易列表 transactions = [] # 创建 3 个用户 alice = Wallet() tom = Wallet() bob = Wallet() print("alice创建创世区块...") blockchain = generate_genesis_block(alice) print() print(f"alice 的余额为{get_balance(alice, blockchain)}个比特币") print(f"tom 的余额为{get_balance(tom, blockchain)}个比特币") print(f"bob 的余额为{get_balance(bob, blockchain)}个比特币") print() # 打印区块链信息 blockchain.print_list() print("新增交易:alice 转账 0.5 比特币给 tom") nt = add_transaction(alice, tom, 0.5) print() verify_new_transaction(nt, transactions) print(f"矿工 bob 将全部验证成功的交易列表打包出块...") generate_block(bob, transactions, blockchain) print("添加完成\n") # 打印区块链信息 blockchain.print_list()测试结果 (责任编辑:) |