Coverage for mt940/models.py: 0%
177 statements
« prev ^ index » next coverage.py v7.2.7, created at 2025-03-01 10:17 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2025-03-01 10:17 +0000
1import re
2import decimal
3import datetime
5# python 3.8+ compatibility
6try: # pragma: no cover
7 from collections import abc
8except ImportError: # pragma: no cover
9 import collections as abc
11import mt940
13from . import _compat
14from . import processors
17class Model(object):
19 def __repr__(self):
20 return '<%s>' % self.__class__.__name__
23class FixedOffset(datetime.tzinfo):
24 '''Fixed time offset based on the Python docs
25 Source: https://docs.python.org/2/library/datetime.html#tzinfo-objects
27 >>> offset = FixedOffset(60)
28 >>> offset.utcoffset(None).total_seconds()
29 3600.0
30 >>> offset.dst(None)
31 datetime.timedelta(0)
32 >>> offset.tzname(None)
33 '60'
34 '''
36 def __init__(self, offset=0, name=None):
37 self._name = name or str(offset)
39 if not isinstance(offset, int):
40 offset = int(offset)
41 self._offset = datetime.timedelta(minutes=offset)
43 def utcoffset(self, dt):
44 return self._offset
46 def dst(self, dt):
47 return datetime.timedelta(0)
49 def tzname(self, dt):
50 return self._name
53class DateTime(datetime.datetime, Model):
54 '''Just a regular datetime object which supports dates given as strings
56 >>> DateTime(year='2000', month='1', day='2', hour='3', minute='4',
57 ... second='5', microsecond='6')
58 DateTime(2000, 1, 2, 3, 4, 5, 6)
60 >>> DateTime(year='123', month='1', day='2', hour='3', minute='4',
61 ... second='5', microsecond='6')
62 DateTime(2123, 1, 2, 3, 4, 5, 6)
64 >>> DateTime(2000, 1, 2, 3, 4, 5, 6)
65 DateTime(2000, 1, 2, 3, 4, 5, 6)
67 >>> DateTime(year='123', month='1', day='2', hour='3', minute='4',
68 ... second='5', microsecond='6', tzinfo=FixedOffset('60'))
69 DateTime(2123, 1, 2, 3, 4, 5, 6, tzinfo=<mt940.models.FixedOffset ...>)
71 Args:
72 year (str): Year (0-100), will automatically add 2000 when needed
73 month (str): Month
74 day (str): Day
75 hour (str): Hour
76 minute (str): Minute
77 second (str): Second
78 microsecond (str): Microsecond
79 tzinfo (tzinfo): Timezone information. Overwrites `offset`
80 offset (str): Timezone offset in minutes, generates a tzinfo object
81 with the given offset if no tzinfo is available.
82 '''
84 def __new__(cls, *args, **kwargs):
85 if kwargs:
86 values = dict(
87 year=None,
88 month=None,
89 day=None,
90 hour='0',
91 minute='0',
92 second='0',
93 microsecond='0', )
95 # The list makes sure this works in both Python 2 and 3
96 for key, default in list(values.items()):
97 # Fetch the value or the default
98 value = kwargs.get(key, default)
99 assert value is not None, '%s should not be None' % key
100 # Convert the value to integer and force base 10 to make sure
101 # it doesn't get recognized as octal
102 if not isinstance(value, int):
103 value = int(value, 10)
105 # Save the values again
106 values[key] = value
108 if values['year'] < 1000:
109 values['year'] += 2000
111 values['tzinfo'] = None
113 if kwargs.get('tzinfo'):
114 values['tzinfo'] = kwargs['tzinfo']
116 if kwargs.get('offset'):
117 values['tzinfo'] = FixedOffset(kwargs['offset'])
119 return datetime.datetime.__new__(cls, **values)
120 else:
121 return datetime.datetime.__new__(cls, *args, **kwargs)
124class Date(datetime.date, Model):
125 '''Just a regular date object which supports dates given as strings
127 >>> Date(year='2000', month='1', day='2')
128 Date(2000, 1, 2)
130 >>> Date(year='123', month='1', day='2')
131 Date(2123, 1, 2)
133 Args:
134 year (str): Year (0-100), will automatically add 2000 when needed
135 month (str): Month
136 day (str): Day
137 '''
139 def __new__(cls, *args, **kwargs):
140 if kwargs:
141 dt = DateTime(*args, **kwargs).date()
143 return datetime.date.__new__(cls, dt.year, dt.month, dt.day)
144 else:
145 return datetime.date.__new__(cls, *args, **kwargs)
148class Amount(Model):
149 '''Amount object containing currency and amount
151 Args:
152 amount (str): Amount using either a , or a . as decimal separator
153 status (str): Either C or D for credit or debit respectively
154 currency (str): A 3 letter currency (e.g. EUR)
156 >>> Amount('123.45', 'C', 'EUR')
157 <123.45 EUR>
158 >>> Amount('123.45', 'D', 'EUR')
159 <-123.45 EUR>
160 '''
162 def __init__(self, amount, status, currency=None, **kwargs):
163 self.amount = decimal.Decimal(amount.replace(',', '.'))
164 self.currency = currency
166 # C = credit, D = debit
168 if status == 'D':
169 self.amount = -self.amount
171 def __eq__(self, other):
172 return self.amount == other.amount and self.currency == other.currency
174 def __str__(self):
175 return '%s %s' % (self.amount, self.currency)
177 def __repr__(self):
178 return '<%s>' % self
181class SumAmount(Amount):
182 def __init__(self, *args, **kwargs):
183 number = kwargs.pop('number')
184 super(SumAmount, self).__init__(*args, **kwargs)
185 self.number = number
187 def __repr__(self):
188 return '<%s %s in %s stmts)>' % (self.amount, self.currency,
189 self.number)
192class Balance(Model):
193 '''Parse balance statement
195 Args:
196 status (str): Either C or D for credit or debit respectively
197 amount (Amount): Object containing the amount and currency
198 date (date): The balance date
200 >>> balance = Balance('C', '0.00', Date(2010, 7, 22))
201 >>> balance.status
202 'C'
203 >>> balance.amount.amount
204 Decimal('0.00')
205 >>> isinstance(balance.date, Date)
206 True
207 >>> balance.date.year, balance.date.month, balance.date.day
208 (2010, 7, 22)
210 >>> Balance()
211 <None @ None>
212 '''
214 def __init__(self, status=None, amount=None, date=None, **kwargs):
215 if amount and not isinstance(amount, Amount):
216 amount = Amount(amount, status, kwargs.get('currency'))
217 self.status = status
218 self.amount = amount
219 self.date = date
221 def __eq__(self, other):
222 return self.amount == other.amount and self.status == other.status
224 def __repr__(self):
225 return '<%s>' % self
227 def __str__(self):
228 return '%s @ %s' % (
229 self.amount,
230 self.date, )
233class Transactions(abc.Sequence):
234 '''
235 Collection of :py:class:`Transaction` objects with global properties such
236 as begin and end balance
238 '''
240 #: Using the processors you can pre-process data before creating objects
241 #: and modify them after creating the objects
242 DEFAULT_PROCESSORS = dict(
243 pre_account_identification=[],
244 post_account_identification=[],
245 pre_available_balance=[],
246 post_available_balance=[],
247 pre_closing_balance=[],
248 post_closing_balance=[],
249 pre_intermediate_closing_balance=[],
250 post_intermediate_closing_balance=[],
251 pre_final_closing_balance=[],
252 post_final_closing_balance=[],
253 pre_forward_available_balance=[],
254 post_forward_available_balance=[],
255 pre_opening_balance=[],
256 post_opening_balance=[],
257 pre_intermediate_opening_balance=[],
258 post_intermediate_opening_balance=[],
259 pre_final_opening_balance=[],
260 post_final_opening_balance=[],
261 pre_related_reference=[],
262 post_related_reference=[],
263 pre_statement=[processors.date_fixup_pre_processor],
264 post_statement=[
265 processors.date_cleanup_post_processor,
266 processors.transactions_to_transaction('transaction_reference'),
267 ],
268 pre_statement_number=[],
269 post_statement_number=[],
270 pre_non_swift=[],
271 post_non_swift=[],
272 pre_transaction_details=[],
273 post_transaction_details=[
274 processors.transaction_details_post_processor
275 # processors.transaction_details_post_processor_with_space
276 ],
277 pre_transaction_reference_number=[],
278 post_transaction_reference_number=[],
279 pre_floor_limit_indicator=[],
280 post_floor_limit_indicator=[],
281 pre_date_time_indication=[],
282 post_date_time_indication=[],
283 pre_sum_credit_entries=[],
284 post_sum_credit_entries=[],
285 pre_sum_debit_entries=[],
286 post_sum_debit_entries=[])
288 def __getstate__(self): # pragma: no cover
289 # Processors are not always safe to dump so ignore them entirely
290 state = self.__dict__.copy()
291 del state['processors']
292 return state
294 def __init__(self, processors=None, tags=None):
295 self.processors = self.DEFAULT_PROCESSORS.copy()
296 self.tags = Transactions.defaultTags().copy()
298 if processors:
299 self.processors.update(processors)
300 if tags:
301 self.tags.update(tags)
303 self.transactions = []
304 self.data = {}
306 @property
307 def currency(self):
308 balance = mt940.utils.coalesce(
309 self.data.get('final_opening_balance'),
310 self.data.get('opening_balance'),
311 self.data.get('intermediate_opening_balance'),
312 self.data.get('available_balance'),
313 self.data.get('forward_available_balance'),
314 self.data.get('final_closing_balance'),
315 self.data.get('closing_balance'),
316 self.data.get('intermediate_closing_balance'),
317 self.data.get('c_floor_limit'),
318 self.data.get('d_floor_limit'), )
320 if balance:
321 if isinstance(balance, Amount):
322 return balance.currency
324 return balance.amount.currency
326 @staticmethod
327 def defaultTags():
328 return mt940.tags.TAG_BY_ID
330 @classmethod
331 def strip(cls, lines):
332 for line in lines:
333 # We don't like carriage returns in case of Windows files so let's
334 # just replace them with nothing
335 line = line.replace('\r', '')
337 # Strip trailing whitespace from lines since they cause incorrect
338 # files
339 line = line.rstrip()
341 # Skip separators
343 if line.strip() == '-':
344 continue
346 # Return actual lines
348 if line:
349 yield line
351 @classmethod
352 def normalize_tag_id(cls, tag_id):
353 # Since non-digit tags exist, make the conversion optional
354 if tag_id.isdigit():
355 tag_id = int(tag_id)
357 return tag_id
359 def sanitize_tag_id_matches(self, matches):
360 i_next = 0
361 for i, match in enumerate(matches):
362 # match was rejected
363 if i < i_next:
364 continue
366 # next match would be
367 i_next = i + 1
369 # normalize tag id
370 tag_id = self.normalize_tag_id(match.group('tag'))
372 # tag should be known
373 assert tag_id in self.tags, 'Unknown tag %r ' \
374 'in line: %r' % (tag_id, match.group(0))
376 # special treatment for long tag content with possible
377 # bad line wrap which produces tag_id like line beginnings
378 # seen with :86: tag
379 if tag_id == mt940.tags.Tags.TRANSACTION_DETAILS.value.id:
380 # search subsequent tags for unknown tag ids
381 # these lines likely belong to the previous tag
382 for j in range(i_next, len(matches)):
383 next_tag_id = self.normalize_tag_id(
384 matches[j].group('tag'))
385 if next_tag_id in self.tags:
386 # this one is the next valid match
387 i_next = j
388 break
389 # else reject match
391 # a valid match
392 yield match
394 def parse(self, data):
395 '''Parses mt940 data, expects a string with data
397 Args:
398 data (str): The MT940 data
400 Returns: :py:class:`list` of :py:class:`Transaction`
401 '''
402 # Remove extraneous whitespace and such
403 data = '\n'.join(self.strip(data.split('\n')))
405 # The pattern is a bit annoying to match by regex, even with a greedy
406 # match it's difficult to get both the beginning and the end so we're
407 # working around it in a safer way to get everything.
408 tag_re = re.compile(
409 r'^:\n?(?P<full_tag>(?P<tag>[0-9]{2}|NS)(?P<sub_tag>[A-Z])?):',
410 re.MULTILINE)
411 matches = list(tag_re.finditer(data))
413 # identify valid matches
414 valid_matches = list(self.sanitize_tag_id_matches(matches))
416 for i, match in enumerate(valid_matches):
417 tag_id = self.normalize_tag_id(match.group('tag'))
419 # get tag instance corresponding to tag id
420 tag = self.tags.get(match.group('full_tag')) \
421 or self.tags[tag_id]
423 # Nice trick to get all the text that is part of this tag, python
424 # regex matches have a `end()` and `start()` to indicate the start
425 # and end index of the match.
427 if valid_matches[i + 1:i + 2]:
428 tag_data = \
429 data[match.end():valid_matches[i + 1].start()].strip()
430 else:
431 tag_data = data[match.end():].strip()
433 tag_dict = tag.parse(self, tag_data)
435 # Preprocess data before creating the object
437 for processor in self.processors.get('pre_%s' % tag.slug, []):
438 tag_dict = processor(self, tag, tag_dict)
440 result = tag(self, tag_dict)
442 # Postprocess the object
444 for processor in self.processors.get('post_%s' % tag.slug, []):
445 result = processor(self, tag, tag_dict, result)
447 # Creating a new transaction for :20: and :61: tags allows the
448 # tags from :20: to :61: to be captured as part of the transaction.
450 if isinstance(tag, mt940.tags.Statement):
451 # Transactions only get a Transaction Reference Code ID from a
452 # :61: tag which is why a new transaction is created if the
453 # 'id' has a value.
455 if not self.transactions:
456 transaction = Transaction(self)
457 self.transactions.append(transaction)
459 if transaction.data.get('id'):
460 transaction = Transaction(self, result)
461 self.transactions.append(transaction)
462 else:
463 transaction.data.update(result)
464 elif issubclass(tag.scope, Transaction) and self.transactions:
465 # Combine multiple results together as one string, Rabobank has
466 # multiple :86: tags for a single transaction
468 for k, v in _compat.iteritems(result):
469 if k in transaction.data and hasattr(v, 'strip'):
470 transaction.data[k] += '\n%s' % v.strip()
471 else:
472 transaction.data[k] = v
474 elif issubclass(tag.scope, Transactions): # pragma: no branch
475 self.data.update(result)
477 return self.transactions
479 def __getitem__(self, key):
480 return self.transactions[key]
482 def __len__(self):
483 return len(self.transactions)
485 def __repr__(self):
486 return '<%s[%s]>' % (
487 self.__class__.__name__,
488 ']['.join('%s: %s' % (k.replace('_balance', ''), v)
489 for k, v in _compat.iteritems(self.data)
490 if k.endswith('balance')))
493class Transaction(Model):
494 def __init__(self, transactions, data=None):
495 self.transactions = transactions
496 self.data = {}
497 self.update(data)
499 def update(self, data):
500 if data:
501 self.data.update(data)
503 def __repr__(self):
504 return '<%s[%s] %s>' % (
505 self.__class__.__name__,
506 self.data.get('date'),
507 self.data.get('amount'), )