#!/usr/bin/python
#
# Copyright (C) 2009 Tan Swee Heng
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
__author__ = 'thesweeheng@gmail.com'
from gdata.finance.service import \
FinanceService, PortfolioQuery, PositionQuery
from gdata.finance import \
PortfolioEntry, PortfolioData, TransactionEntry, TransactionData, \
Price, Commission, Money
import datetime
import sys
def PrintReturns(pfx, d):
"""Print returns."""
print pfx, '%1.5f(1w) %1.5f(4w) %1.5f(3m) %1.5f(YTD)' % tuple(
float(i) for i in (d.return1w, d.return4w, d.return3m, d.returnYTD))
pfx = ' ' * len(pfx)
print pfx, '%1.5f(1y) %1.5f(3y) %1.5f(5y) %1.5f(overall)' % tuple(
float(i) for i in (d.return1y, d.return3y, d.return5y, d.return_overall))
PrRtn = PrintReturns
def PrintTransactions(transactions):
"""Print transactions."""
print " Transactions:"
fmt = ' %4s %-23s %-10s %6s %-11s %-11s'
print fmt % ('ID','Date','Type','Shares','Price','Commission')
for txn in transactions:
d = txn.transaction_data
print fmt % (txn.transaction_id, d.date or '----', d.type,
d.shares, d.price.money[0], d.commission.money[0])
if d.notes:
print " Notes:", d.notes
print
def PrintPosition(pos, with_returns=False):
"""Print single position."""
print ' Position :', pos.position_title
print ' Ticker ID :', pos.ticker_id
print ' Symbol :', pos.symbol
print ' Last updated :', pos.updated.text
d = pos.position_data
print ' Shares :', d.shares
if with_returns:
print ' Gain % :', d.gain_percentage
PrRtn(' Returns :', d)
print ' Cost basis :', d.cost_basis
print ' Days gain :', d.days_gain
print ' Gain :', d.gain
print ' Market value :', d.market_value
print
if pos.transactions:
print " \n"
PrintTransactions(pos.transactions)
print " \n"
def PrintPositions(positions, with_returns=False):
for pos in positions:
PrintPosition(pos, with_returns)
def PrintPortfolio(pfl, with_returns=False):
"""Print single portfolio."""
print 'Portfolio Title :', pfl.portfolio_title
print 'Portfolio ID :', pfl.portfolio_id
print ' Last updated :', pfl.updated.text
d = pfl.portfolio_data
print ' Currency :', d.currency_code
if with_returns:
print ' Gain % :', d.gain_percentage
PrRtn(' Returns :', d)
print ' Cost basis :', d.cost_basis
print ' Days gain :', d.days_gain
print ' Gain :', d.gain
print ' Market value :', d.market_value
print
if pfl.positions:
print " \n"
PrintPositions(pfl.positions, with_returns)
print " \n"
def PrintPortfolios(portfolios, with_returns=False):
for pfl in portfolios:
PrintPortfolio(pfl, with_returns)
def ShowCallDetails(meth):
def wrap(*args, **kwargs):
print '@', meth.__name__, args[1:], kwargs
meth(*args, **kwargs)
return wrap
class FinanceTester(object):
def __init__(self, email, password):
self.client = FinanceService(source='gdata-finance-test')
self.client.ClientLogin(email, password)
def GetPortfolios(self, with_returns=False, inline_positions=False):
query = PortfolioQuery()
query.returns = with_returns
query.positions = inline_positions
return self.client.GetPortfolioFeed(query=query).entry
def GetPositions(self, portfolio, with_returns=False, inline_transactions=False):
query = PositionQuery()
query.returns = with_returns
query.transactions = inline_transactions
return self.client.GetPositionFeed(portfolio, query=query).entry
def GetTransactions(self, position=None, portfolio=None, ticker=None):
if position:
feed = self.client.GetTransactionFeed(position)
elif portfolio and ticker:
feed = self.client.GetTransactionFeed(
portfolio_id=portfolio.portfolio_id, ticker_id=ticker)
return feed.entry
@ShowCallDetails
def TestShowDetails(self, with_returns=False, inline_positions=False,
inline_transactions=False):
portfolios = self.GetPortfolios(with_returns, inline_positions)
for pfl in portfolios:
PrintPortfolio(pfl, with_returns)
positions = self.GetPositions(pfl, with_returns, inline_transactions)
for pos in positions:
PrintPosition(pos, with_returns)
PrintTransactions(self.GetTransactions(pos))
def DeletePortfoliosByName(self, portfolio_titles):
for pfl in self.GetPortfolios():
if pfl.portfolio_title in portfolio_titles:
self.client.DeletePortfolio(pfl)
def AddPortfolio(self, portfolio_title, currency_code):
pfl = PortfolioEntry(portfolio_data=PortfolioData(
currency_code=currency_code))
pfl.portfolio_title = portfolio_title
return self.client.AddPortfolio(pfl)
def UpdatePortfolio(self, portfolio,
portfolio_title=None, currency_code=None):
if portfolio_title:
portfolio.portfolio_title = portfolio_title
if currency_code:
portfolio.portfolio_data.currency_code = currency_code
return self.client.UpdatePortfolio(portfolio)
def DeletePortfolio(self, portfolio):
self.client.DeletePortfolio(portfolio)
@ShowCallDetails
def TestManagePortfolios(self):
pfl_one = 'Portfolio Test: Emerging Markets 12345'
pfl_two = 'Portfolio Test: Renewable Energy 31415'
print '---- Deleting portfolios ----'
self.DeletePortfoliosByName([pfl_one, pfl_two])
PrintPortfolios(self.GetPortfolios())
print '---- Adding new portfolio ----'
pfl = self.AddPortfolio(pfl_one, 'SGD')
PrintPortfolios(self.GetPortfolios())
print '---- Changing portfolio title and currency code ----'
pfl = self.UpdatePortfolio(pfl, pfl_two, 'USD')
PrintPortfolios(self.GetPortfolios())
print '---- Deleting portfolio ----'
self.DeletePortfolio(pfl)
PrintPortfolios(self.GetPortfolios())
def Transact(self, type, portfolio, ticker, date=None, shares=None,
notes=None, price=None, commission=None, currency_code=None):
if price is not None:
price = Price(money=[Money(amount=str(price),
currency_code=currency_code or
portfolio.portfolio_data.currency_code)])
if commission is not None:
commission = Commission(money=[Money(amount=str(comission),
currency_code=currency_code or
portfolio.portfolio_data.currency_code)])
if date is not None and isinstance(date, datetime.datetime):
date = date.isoformat()
if shares is not None:
shares = str(shares)
txn = TransactionEntry(transaction_data=TransactionData(type=type,
date=date, shares=shares, notes=notes, price=price,
commission=commission))
return self.client.AddTransaction(txn,
portfolio_id=portfolio.portfolio_id, ticker_id=ticker)
def Buy(self, portfolio, ticker, **kwargs):
return self.Transact('Buy', portfolio, ticker, **kwargs)
def Sell(self, portfolio, ticker, **kwargs):
return self.Transact('Sell', portfolio, ticker, **kwargs)
def GetPosition(self, portfolio, ticker, with_returns=False, inline_transactions=False):
query = PositionQuery()
query.returns = with_returns
query.transactions = inline_transactions
return self.client.GetPosition(
portfolio_id=portfolio.portfolio_id, ticker_id=ticker, query=query)
def DeletePosition(self, position):
self.client.DeletePosition(position_entry=position)
def UpdateTransaction(self, transaction):
self.client.UpdateTransaction(transaction)
def DeleteTransaction(self, transaction):
self.client.DeleteTransaction(transaction)
@ShowCallDetails
def TestManageTransactions(self):
pfl_title = 'Transaction Test: Technology 27182'
self.DeletePortfoliosByName([pfl_title])
print '---- Adding new portfolio ----'
pfl = self.AddPortfolio(pfl_title, 'USD')
PrintPortfolios(self.GetPortfolios())
print '---- Adding buy transactions ----'
tkr1 = 'NASDAQ:GOOG'
date = datetime.datetime(2009,04,01)
days = datetime.timedelta(1)
txn1 = self.Buy(pfl, tkr1, shares=500, price=321.00, date=date)
txn2 = self.Buy(pfl, tkr1, shares=150, price=312.00, date=date+15*days)
pos = self.GetPosition(portfolio=pfl, ticker=tkr1, with_returns=True)
PrintPosition(pos, with_returns=True)
PrintTransactions(self.GetTransactions(pos))
print '---- Adding sell transactions ----'
txn3 = self.Sell(pfl, tkr1, shares=400, price=322.00, date=date+30*days)
txn4 = self.Sell(pfl, tkr1, shares=200, price=330.00, date=date+45*days)
pos = self.GetPosition(portfolio=pfl, ticker=tkr1, with_returns=True)
PrintPosition(pos, with_returns=True)
PrintTransactions(self.GetTransactions(pos))
print "---- Modifying first and deleting third ----"
txn1.transaction_data.shares = '400.0'
self.UpdateTransaction(txn1)
self.DeleteTransaction(txn3)
pos = self.GetPosition(portfolio=pfl, ticker=tkr1, with_returns=True)
PrintPosition(pos, with_returns=True)
PrintTransactions(self.GetTransactions(pos))
print "---- Deleting position ----"
print "Number of positions (before):", len(self.GetPositions(pfl))
self.DeletePosition(pos)
print "Number of positions (after) :", len(self.GetPositions(pfl))
print '---- Deleting portfolio ----'
self.DeletePortfolio(pfl)
PrintPortfolios(self.GetPortfolios())
if __name__ == '__main__':
try:
email = sys.argv[1]
password = sys.argv[2]
cases = sys.argv[3:]
except IndexError:
print "Usage: test_finance account@google.com password [0 1 2...]"
sys.exit(1)
tester = FinanceTester(email, password)
tests = [
tester.TestShowDetails,
lambda: tester.TestShowDetails(with_returns=True),
tester.TestManagePortfolios,
tester.TestManageTransactions,
lambda: tester.TestShowDetails(with_returns=True, inline_positions=True),
lambda: tester.TestShowDetails(with_returns=True, inline_positions=True,
inline_transactions=True),]
if not cases:
cases = range(len(tests))
for i in cases:
print "===== TEST CASE", i, "="*50
tests[int(i)]()