Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ATR = True Range with Wilder's MA #153

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 9 additions & 24 deletions talipp/indicators/ATR.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from typing import List, Any

from talipp.indicator_util import has_valid_values
from talipp.indicators.Indicator import Indicator, InputModifierType
from talipp.indicators.TrueRange import TrueRange
from talipp.input import SamplingPeriodType
from talipp.ohlcv import OHLCV
from talipp.ma import MAType, MAFactory


class ATR(Indicator):
Expand All @@ -19,40 +20,24 @@ class ATR(Indicator):
input_indicator: Input indicator.
input_modifier: Input modifier.
input_sampling: Input sampling type.
"""

"""
def __init__(self, period: int,
input_values: List[OHLCV] = None,
input_indicator: Indicator = None,
input_modifier: InputModifierType = None,
ma_type: MAType = MAType.WilderMA,
input_sampling: SamplingPeriodType = None):
super(ATR, self).__init__(input_modifier=input_modifier,
input_sampling=input_sampling)

self.period = period
self.tr = []

self.add_managed_sequence(self.tr)
self._tr = TrueRange()
self.add_sub_indicator(self._tr)

self._ma_tr = MAFactory.get_ma(ma_type, period, input_indicator=self._tr)

self.initialize(input_values, input_indicator)

def _calculate_new_value(self) -> Any:
high = self.input_values[-1].high
low = self.input_values[-1].low

if has_valid_values(self.input_values, 1, exact=True):
self.tr.append(high - low)
else:
close2 = self.input_values[-2].close
self.tr.append(max(
high - low,
abs(high - close2),
abs(low - close2),
))

if len(self.input_values) < self.period:
return None
elif len(self.input_values) == self.period:
return sum(self.tr) / self.period
else:
return (self.output_values[-1] * (self.period - 1) + self.tr[-1]) / self.period
return self._ma_tr.output_values[-1]
10 changes: 5 additions & 5 deletions talipp/indicators/CHOP.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from typing import List, Any

from talipp.indicator_util import has_valid_values
from talipp.indicators.ATR import ATR
from talipp.indicators.TrueRange import TrueRange
from talipp.indicators.Indicator import Indicator, InputModifierType
from talipp.input import SamplingPeriodType
from talipp.ohlcv import OHLCV
Expand Down Expand Up @@ -33,19 +33,19 @@ def __init__(self, period: int,

self.period = period

self.atr = ATR(1)
self.add_sub_indicator(self.atr)
self.tr = TrueRange()
self.add_sub_indicator(self.tr)

self.initialize(input_values, input_indicator)

def _calculate_new_value(self) -> Any:
if not has_valid_values(self.atr, self.period) or not has_valid_values(self.input_values, self.period):
if not has_valid_values(self.tr, self.period) or not has_valid_values(self.input_values, self.period):
return None

max_high = max(self.input_values[-self.period:], key = lambda x: x.high).high
min_low = min(self.input_values[-self.period:], key = lambda x: x.low).low

if max_high != min_low:
return 100.0 * log10(sum(self.atr[-self.period:]) / (max_high - min_low) ) / log10(self.period)
return 100.0 * log10(sum(self.tr[-self.period:]) / (max_high - min_low) ) / log10(self.period)
else:
return None
45 changes: 45 additions & 0 deletions talipp/indicators/TrueRange.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from typing import List, Any

from talipp.indicator_util import has_valid_values
from talipp.indicators.Indicator import Indicator, InputModifierType
from talipp.input import SamplingPeriodType
from talipp.ohlcv import OHLCV


class TrueRange(Indicator):
"""True Range

Input type: [OHLCV][talipp.ohlcv.OHLCV]

Output type: `float`

Args:
input_values: List of input values.
input_indicator: Input indicator.
input_modifier: Input modifier.
input_sampling: Input sampling type.
"""

def __init__(self,
input_values: List[OHLCV] = None,
input_indicator: Indicator = None,
input_modifier: InputModifierType = None,
input_sampling: SamplingPeriodType = None):
super(TrueRange, self).__init__(input_modifier=input_modifier,
input_sampling=input_sampling)

self.initialize(input_values, input_indicator)

def _calculate_new_value(self) -> Any:
high = self.input_values[-1].high
low = self.input_values[-1].low

if has_valid_values(self.input_values, 1, exact=True):
return high - low
else:
close2 = self.input_values[-2].close
return max(
high - low,
abs(high - close2),
abs(low - close2),
)
10 changes: 5 additions & 5 deletions talipp/indicators/VTX.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from typing import List, Any

from talipp.indicator_util import has_valid_values
from talipp.indicators.ATR import ATR
from talipp.indicators.TrueRange import TrueRange
from talipp.indicators.Indicator import Indicator, InputModifierType
from talipp.input import SamplingPeriodType
from talipp.ohlcv import OHLCV
Expand Down Expand Up @@ -53,8 +53,8 @@ def __init__(self, period: int,
self.minus_vm = []
self.add_managed_sequence(self.minus_vm)

self.atr = ATR(1)
self.add_sub_indicator(self.atr)
self.tr = TrueRange()
self.add_sub_indicator(self.tr)

self.initialize(input_values, input_indicator)

Expand All @@ -68,9 +68,9 @@ def _calculate_new_value(self) -> Any:
self.plus_vm.append(abs(value.high - value2.low))
self.minus_vm.append(abs(value.low - value2.high))

if not has_valid_values(self.atr, self.period) or not has_valid_values(self.plus_vm, self.period) or \
if not has_valid_values(self.tr, self.period) or not has_valid_values(self.plus_vm, self.period) or \
not has_valid_values(self.minus_vm, self.period):
return None

atr_sum = float(sum(self.atr[-self.period:]))
atr_sum = float(sum(self.tr[-self.period:]))
return VTXVal(sum(self.plus_vm[-self.period:]) / atr_sum, sum(self.minus_vm[-self.period:]) / atr_sum)
42 changes: 42 additions & 0 deletions talipp/indicators/WilderMA.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from typing import List, Any

from talipp.indicator_util import has_valid_values
from talipp.indicators.Indicator import Indicator, InputModifierType
from talipp.input import SamplingPeriodType


class WilderMA(Indicator):
"""Wilder's Moving Average.

Input type: `float`

Output type: `float`

Args:
period: Period.
input_values: List of input values.
input_indicator: Input indicator.
input_modifier: Input modifier.
input_sampling: Input sampling type.
"""

def __init__(self, period: int,
input_values: List[float] = None,
input_indicator: Indicator = None,
input_modifier: InputModifierType = None,
input_sampling: SamplingPeriodType = None):
super().__init__(input_modifier=input_modifier,
input_sampling=input_sampling)

self.period = period
self.k = 1.0 / self.period

self.initialize(input_values, input_indicator)

def _calculate_new_value(self) -> Any:
if len(self.input_values) < self.period:
return None
elif has_valid_values(self.input_values, self.period, exact=True):
return sum(self.input_values[-self.period:]) / self.period
else:
return float(self.k * self.input_values[-1] + (1.0 - self.k) * self.output_values[-1])
3 changes: 3 additions & 0 deletions talipp/indicators/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,14 @@
from .T3 import T3 as T3
from .TEMA import TEMA as TEMA
from .TRIX import TRIX as TRIX
from .TrueRange import TrueRange as TrueRange
from .TSI import TSI as TSI
from .TTM import TTM as TTM
from .UO import UO as UO
from .VTX import VTX as VTX
from .VWAP import VWAP as VWAP
from .VWMA import VWMA as VWMA
from .WilderMA import WilderMA as WilderMA
from .WMA import WMA as WMA
from .ZigZag import ZigZag as ZigZag
from .ZLEMA import ZLEMA as ZLEMA
Expand Down Expand Up @@ -113,6 +115,7 @@
"VTX",
"VWAP",
"VWMA",
"WilderMA",
"WMA",
"ZigZag",
"ZLEMA"
Expand Down
6 changes: 6 additions & 0 deletions talipp/ma.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from talipp.indicators.TEMA import TEMA
from talipp.indicators.TRIX import TRIX
from talipp.indicators.VWMA import VWMA
from talipp.indicators.WilderMA import WilderMA
from talipp.indicators.WMA import WMA
from talipp.indicators.ZLEMA import ZLEMA

Expand Down Expand Up @@ -54,6 +55,9 @@ class MAType(Enum):
VWMA = auto()
"""[Volume Weighted Moving Average][talipp.indicators.VWMA]"""

WilderMA = auto()
"""[Wilder's Moving Average][talipp.indicators.WMA]"""

WMA = auto()
"""[Weighted Moving Average][talipp.indicators.WMA]"""

Expand Down Expand Up @@ -101,6 +105,8 @@ def get_ma(ma_type: MAType,
return HMA(period=period, input_values=input_values, input_indicator=input_indicator, input_modifier=input_modifier)
elif ma_type == MAType.VWMA:
return VWMA(period=period, input_values=input_values, input_indicator=input_indicator, input_modifier=input_modifier)
elif ma_type == MAType.WilderMA:
return WilderMA(period=period, input_values=input_values, input_indicator=input_indicator, input_modifier=input_modifier)
elif ma_type == MAType.WMA:
return WMA(period=period, input_values=input_values, input_indicator=input_indicator, input_modifier=input_modifier)
elif ma_type == MAType.T3:
Expand Down
32 changes: 32 additions & 0 deletions test/test_WilderMA.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import unittest

from talipp.indicators import WilderMA

from TalippTest import TalippTest


class Test(TalippTest):
def setUp(self) -> None:
self.input_values = list(TalippTest.CLOSE_TMPL)

def test_init(self):
ind = WilderMA(5, self.input_values)

print(ind)

self.assertAlmostEqual(ind[-3], 9.699400, places = 5)
self.assertAlmostEqual(ind[-2], 9.805521, places = 5)
self.assertAlmostEqual(ind[-1], 9.844417, places = 5)

def test_update(self):
self.assertIndicatorUpdate(WilderMA(5, self.input_values))

def test_delete(self):
self.assertIndicatorDelete(WilderMA(5, self.input_values))

def test_purge_oldest(self):
self.assertIndicatorPurgeOldest(WilderMA(5, self.input_values))


if __name__ == '__main__':
unittest.main()
Loading