# -*- coding: utf-8 -*-
from aiobitcoin.tools import ecdsa
from aiobitcoin.tools.encoding import EncodingError, a2b_hashed_base58, \
from_bytes_32, hash160, hash160_sec_to_bitcoin_address, \
is_sec_compressed, public_pair_to_sec, public_pair_to_hash160_sec, \
sec_to_public_pair, secret_exponent_to_wif
from aiobitcoin.tools.key.validate import netcode_and_type_for_data
from aiobitcoin.tools.networks import address_prefix_for_netcode, wif_prefix_for_netcode
from aiobitcoin.tools.networks.default import get_current_netcode
from aiobitcoin.tools.serialize import b2h
from aiobitcoin.tools.tx.script.der import sigencode_der, sigdecode_der
[docs]class InvalidPublicPairError(ValueError):
pass
[docs]class InvalidSecretExponentError(ValueError):
pass
[docs]class Key(object):
def __init__(self, secret_exponent=None, public_pair=None, hash160=None,
prefer_uncompressed=None, is_compressed=None, is_pay_to_script=False, netcode=None):
"""
secret_exponent:
a long representing the secret exponent
public_pair:
a tuple of long integers on the ecdsa curve
hash160:
a hash160 value corresponding to a bitcoin address
Include at most one of secret_exponent, public_pair or hash160.
prefer_uncompressed:
whether or not to produce text outputs as compressed or uncompressed.
is_pay_to_script:
whether or not this key is for a pay-to-script style transaction
netcode:
the code for the network (as defined in pycoin.networks)
Include at most one of secret_exponent, public_pair or hash160.
prefer_uncompressed, is_compressed (booleans) are optional.
"""
if is_compressed is None:
is_compressed = False if hash160 else True
if netcode is None:
netcode = get_current_netcode()
if [secret_exponent, public_pair, hash160].count(None) != 2:
raise ValueError("exactly one of secret_exponent, public_pair, hash160 must be passed.")
if prefer_uncompressed is None:
prefer_uncompressed = not is_compressed
self._prefer_uncompressed = prefer_uncompressed
self._secret_exponent = secret_exponent
self._public_pair = public_pair
self._hash160_uncompressed = None
self._hash160_compressed = None
if hash160:
if is_compressed:
self._hash160_compressed = hash160
else:
self._hash160_uncompressed = hash160
self._netcode = netcode
if self._public_pair is None and self._secret_exponent is not None:
if self._secret_exponent < 1 \
or self._secret_exponent >= ecdsa.generator_secp256k1.order():
raise InvalidSecretExponentError()
public_pair = ecdsa.public_pair_for_secret_exponent(
ecdsa.generator_secp256k1, self._secret_exponent)
self._public_pair = public_pair
if self._public_pair is not None \
and (None in self._public_pair or
not ecdsa.is_public_pair_valid(ecdsa.generator_secp256k1, self._public_pair)):
raise InvalidPublicPairError()
[docs] @classmethod
def from_text(cls, text, is_compressed=False):
"""
This function will accept a BIP0032 wallet string, a WIF, or a bitcoin address.
The "is_compressed" parameter is ignored unless a public address is passed in.
"""
data = a2b_hashed_base58(text)
netcode, key_type, length = netcode_and_type_for_data(data)
data = data[1:]
if key_type in ("pub32", "prv32"):
# : fix this... it doesn't belong here
from aiobitcoin.tools.key.BIP32Node import BIP32Node
return BIP32Node.from_wallet_key(text)
if key_type == 'wif':
is_compressed = (len(data) > 32)
if is_compressed:
data = data[:-1]
return Key(
secret_exponent=from_bytes_32(data),
prefer_uncompressed=not is_compressed, netcode=netcode)
if key_type == 'address':
return Key(hash160=data, is_compressed=is_compressed, netcode=netcode)
raise EncodingError("unknown text: %s" % text)
[docs] @classmethod
def from_sec(class_, sec, netcode=None):
"""
Create a key from an sec bytestream (which is an encoding of a public pair).
"""
public_pair = sec_to_public_pair(sec)
return class_(public_pair=public_pair, is_compressed=is_sec_compressed(sec), netcode=netcode)
[docs] def is_private(self):
return self.secret_exponent() is not None
[docs] def secret_exponent(self):
"""
Return an integer representing the secret exponent (or None).
"""
return self._secret_exponent
[docs] def wif(self, use_uncompressed=None):
"""
Return the WIF representation of this key, if available.
If use_uncompressed is not set, the preferred representation is returned.
"""
wif_prefix = wif_prefix_for_netcode(self._netcode)
secret_exponent = self.secret_exponent()
if secret_exponent is None:
return None
return secret_exponent_to_wif(secret_exponent,
compressed=not self._use_uncompressed(use_uncompressed),
wif_prefix=wif_prefix)
[docs] def public_pair(self):
"""
Return a pair of integers representing the public key (or None).
"""
return self._public_pair
[docs] def netcode(self):
"""
Return the netcode
"""
return self._netcode
[docs] def sec(self, use_uncompressed=None):
"""
Return the SEC representation of this key, if available.
If use_uncompressed is not set, the preferred representation is returned.
"""
public_pair = self.public_pair()
if public_pair is None:
return None
return public_pair_to_sec(public_pair, compressed=not self._use_uncompressed(use_uncompressed))
[docs] def sec_as_hex(self, use_uncompressed=None):
"""
Return the SEC representation of this key as hex text.
If use_uncompressed is not set, the preferred representation is returned.
"""
sec = self.sec(use_uncompressed=use_uncompressed)
if sec is None:
return None
return b2h(sec)
[docs] def hash160(self, use_uncompressed=None):
"""
Return the hash160 representation of this key, if available.
If use_uncompressed is not set, the preferred representation is returned.
"""
use_uncompressed = self._use_uncompressed(use_uncompressed)
if self.public_pair() is None:
if use_uncompressed:
return self._hash160_uncompressed
return self._hash160_compressed
if use_uncompressed:
if self._hash160_uncompressed is None:
self._hash160_uncompressed = hash160(self.sec(use_uncompressed=use_uncompressed))
return self._hash160_uncompressed
if self._hash160_compressed is None:
self._hash160_compressed = hash160(self.sec(use_uncompressed=use_uncompressed))
return self._hash160_compressed
[docs] def address(self, use_uncompressed=None):
"""
Return the public address representation of this key, if available.
If use_uncompressed is not set, the preferred representation is returned.
"""
address_prefix = address_prefix_for_netcode(self._netcode)
hash160 = self.hash160(use_uncompressed=use_uncompressed)
if hash160:
return hash160_sec_to_bitcoin_address(hash160, address_prefix=address_prefix)
return None
bitcoin_address = address
[docs] def as_text(self):
"""
Return a textual representation of this key.
"""
if self.secret_exponent():
return self.wif()
sec_hex = self.sec_as_hex()
if sec_hex:
return sec_hex
return self.address()
[docs] def public_copy(self):
if self.secret_exponent() is None:
return self
return Key(public_pair=self.public_pair(), prefer_uncompressed=self._prefer_uncompressed,
is_compressed=(self._hash160_compressed is not None), netcode=self._netcode)
[docs] def subkey(self, path_to_subkey):
"""
Return the Key corresponding to the hierarchical wallet's subkey
"""
return self
[docs] def subkeys(self, path_to_subkeys):
"""
Return an iterator yielding Keys corresponding to the
hierarchical wallet's subkey path (or just this key).
"""
yield self
[docs] def sign(self, h):
"""
Return a der-encoded signature for a hash h.
Will throw a RuntimeError if this key is not a private key
"""
if not self.is_private():
raise RuntimeError("Key must be private to be able to sign")
val = from_bytes_32(h)
r, s = ecdsa.sign(ecdsa.generator_secp256k1, self.secret_exponent(), val)
return sigencode_der(r, s)
[docs] def verify(self, h, sig):
"""
Return whether a signature is valid for hash h using this key.
"""
val = from_bytes_32(h)
pubkey = self.public_pair()
rs = sigdecode_der(sig)
if self.public_pair() is None:
# find the pubkey from the signature and see if it matches
# our key
possible_pubkeys = ecdsa.possible_public_pairs_for_signature(
ecdsa.generator_secp256k1, val, rs)
hash160 = self.hash160()
for candidate in possible_pubkeys:
if hash160 == public_pair_to_hash160_sec(candidate, True):
pubkey = candidate
break
if hash160 == public_pair_to_hash160_sec(candidate, False):
pubkey = candidate
break
else:
# signature is using a pubkey that's not this key
return False
return ecdsa.verify(ecdsa.generator_secp256k1, pubkey, val, rs)
def _use_uncompressed(self, use_uncompressed=None):
if use_uncompressed:
return use_uncompressed
if use_uncompressed is None:
return self._prefer_uncompressed
return False
def __repr__(self):
r = self.public_copy().as_text()
if self.is_private():
return "private_for <%s>" % r
return "<%s>" % r