#!/usr/bin/env python
# -*- coding: UTF-8 -*-
__author__="Scott Hendrickson"
__license__="Simplified BSD"
import sys
import datetime
import fileinput
from StringIO import StringIO
# Experimental: Use numba to speed up some fo the basic function
# that are run many times per record
# from numba import jit
# use fastest option available
try:
import ujson as json
except ImportError:
try:
import json
except ImportError:
import simplejson as json
gnipError = "GNIPERROR"
gnipRemove = "GNIPREMOVE"
gnipDateTime = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S.000Z")
INTERNAL_EMPTY_FIELD = "GNIPEMPTYFIELD"
[docs]class _Field(object):
"""
Base class for extracting the desired value at the end of a series of keys in a JSON Activity
Streams payload. Set the application-wide default value (for e.g. missing values) here,
but also use child classes to override when necessary. Subclasses also need to define the
key-path (path) to the desired location by overwriting the path attr.
"""
# set some default values; these can be overwritten in custom classes
# twitter format
default_t_fmt = "%Y-%m-%dT%H:%M:%S.000Z"
default_value = INTERNAL_EMPTY_FIELD
path = [] # dict key-path to follow for desired value
label = 'DummyKeyPathLabel' # this must match if-statement in constructor
def __init__(self, json_record):
if self.label == 'DummyKeyPathLabel':
self.label = ':'.join(self.path)
self.value = None # str representation of the field, often = str( self.value_list )
if json_record is not None:
self.value = self.walk_path(json_record)
else:
self.value = self.default_value
def __repr__(self):
return unicode(self.value)
def walk_path(self, json_record, path=None):
res = json_record
if path is None:
path = self.path
for k in path:
if res is None:
break
if k not in res or ( type(res[k]) is list and len(res[k]) == 0 ):
# parenthetical clause for values with empty lists e.g. twitter_entities
return self.default_value
res = res[k]
# handle the special case where the walk_path found null (JSON) which converts to
# a Python None. Only use "None" (str version) if it's assigned to self.default_value
res = res if res is not None else self.default_value
return res
[docs] def walk_path_slower(self, json_record, path=None):
"""Slower version fo walk path. Depricated."""
if path is None:
path = self.path
try:
execstr = "res=json_record" + '["{}"]'*len(path)
exec(execstr.format(*path))
except (KeyError, TypeError):
res = None
if res is None:
res = self.default_value
return res
[docs] def fix_length(self, iterable, limit=None):
"""
Take an iterable (typically a list) and an optional maximum length (limit).
If limit is not given, and the input iterable is not equal to self.default_value
(typically "None"), the input iterable is returned. If limit is given, the return
value is a list that is either truncated to the first limit items, or padded
with self.default_value until it is of size limit. Note: strings are iterables,
so if you pass this function a string, it will (optionally) truncate the
number of characters in the string according to limit.
"""
res = []
if limit is None:
# no limits on the length of the result, so just return the original iterable
res = iterable
else:
#if len(iterable) == 0:
if iterable == self.default_value or len(iterable) == 0:
# if walk_path() finds the final key, but the value is an empty list
# (common for e.g. the contents of twitter_entities)
# overwrite self.value with a list of self.default_value and of length limit
res = [ self.default_value ]*limit
else:
# found something useful in the iterable, either pad the list or truncate
# to end up with something of the proper length
current_length = len( iterable )
if current_length < limit:
res = iterable + [ self.default_value
for _ in range(limit - current_length) ]
else:
res = iterable[:limit]
return res
[docs]class _LimitedField(_Field):
"""
Takes JSON record (in python dict form) and optionally a maximum length (limit,
with default length=5). Uses parent class _Field() to assign the appropriate value
to self.value. When self.value is a list of dictionaries,
inheriting from _LimitedField() class allows for the extraction and combination of
an arbitrary number of fields within self.value into self.value_list.
Ex: if your class would lead to having
self.value = [ {'a': 1, 'b': 2, 'c': 3}, {'a': 4, 'b': 5, 'c': 6} ], and what you'd like
is a list that looks like [ 1, 2, 4, 5 ], inheriting from _LimitedField() allows you
to overwrite the fields list ( fields=["a", "b"] ) to obtain this result.
Finally, self.value is set to a string representation of the final self.value_list.
"""
#TODO: is there a better way that this class and the fix_length() method in _Field class
# could be combined?
#TODO: set limit=None by default and just return as many as there are, otherwise (by specifying
# limit), return a maximum of limit.
# TODO:
# - consolidate _LimitedField() & fix_length() if possible
def __init__(self, json_record, limit=1):
self.fields = None
super(
_LimitedField
, self).__init__(json_record)
# self.value is possibly a list of dicts for each activity media object
if self.fields:
# start with default list full of the default_values
self.value_list = [ self.default_value ]*( len(self.fields)*limit )
if self.value != self.default_value:
for i,x in enumerate(self.value): # iterate over the dicts in the list
if i < limit: # ... up until you reach limit
for j,y in enumerate(self.fields): # iterate over the dict keys
self.value_list[ len( self.fields )*i + j ] = x[ self.fields[j] ]
# finally, str-ify the list
self.value = str( self.value_list )
[docs]class AcsCSV(object):
"""Base class for all delimited list objects. Basic delimited list utility functions"""
def __init__(self, delim, options_keypath):
self.delim = delim
if delim == "":
print >>sys.stderr, "Warning - Output has Null delimiter"
self.rmchars = "\n\r {}".format(self.delim)
self.options_keypath = options_keypath
[docs] def string_hook(self, record_string, mode_dummy):
"""
Returns a file-like StringIO object built from the activity record in record_string.
This is ultimately passed down to the FileInput.readline() method. The mode_dummy
parameter is only included so the signature matches other hooks.
"""
return StringIO( record_string )
[docs] def file_reader(self, options_filename=None, json_string=None):
"""
Read arbitrary input file(s) or standard Python str. When passing file_reader() a
JSON string, assign it to the json_string arg. Yields a tuple of (line number, record).
"""
line_number = 0
if json_string is not None:
hook = self.string_hook
options_filename = json_string
else:
hook = fileinput.hook_compressed
for r in fileinput.FileInput(options_filename, openhook=hook):
line_number += 1
try:
recs = [json.loads(r.strip())]
except ValueError:
try:
# maybe a missing line feed?
recs = [json.loads(x) for x in r.strip().replace("}{", "}GNIP_SPLIT{")
.split("GNIP_SPLIT")]
except ValueError:
sys.stderr.write("Invalid JSON record (%d) %s, skipping\n"
%(line_number, r.strip()))
continue
for record in recs:
if len(record) == 0:
continue
# hack: let the old source modules still have a self.cnt for error msgs
self.cnt = line_number
yield line_number, record
[docs] def cleanField(self,f):
"""Clean fields of new lines and delmiter."""
res = INTERNAL_EMPTY_FIELD
try:
res = f.strip(
).replace("\n"," "
).replace("\r"," "
).replace(self.delim, " "
)
except AttributeError:
try:
# odd edge case that f is a number
# then can't call string functions
float(f)
res = str(f)
except TypeError:
pass
return res
[docs] def buildListString(self,l):
"""Generic list builder returns a string representation of list"""
# unicode output of list (without u's)
res = '['
for r in l:
# handle the various types of lists we might see
if isinstance(r, list):
res += "'" + self.buildListString(r) + "',"
#elif isinstance(r, str):
elif isinstance(r, str) or isinstance(r, unicode):
res += "'" + r + "',"
else:
res += "'" + str(r) + "',"
if res.endswith(','):
res = res[:-1]
res += ']'
return res
[docs] def splitId(self, x, index=1):
"""Generic functions for splitting id parts"""
tmp = x.split("/")
if len(tmp) > index:
return tmp[index]
else:
return x
[docs] def asString(self, l, emptyField):
"""Returns a delimited list object as a properly delimited string."""
if l is None:
return None
for i, x in enumerate(l):
if x == INTERNAL_EMPTY_FIELD:
l[i] = emptyField
return self.delim.join(l)
[docs] def get_source_list(self, x):
"""Wrapper for the core activity parsing function."""
source_list = self.procRecordToList(x)
if self.options_keypath:
source_list.append(self.keyPath(x))
# ensure no pipes, newlines, etc
return [ self.cleanField(x) for x in source_list ]
def procRecord(self, x, emptyField="None"):
return self.asString(self.get_source_list(x), emptyField)
[docs] def asGeoJSON(self, x):
"""Get results as GeoJSON representation."""
record_list = self.procRecordToList(x)
if self.__class__.__name__ == "TwacsCSV" and self.options_geo:
if self.geoCoordsList is None:
return
lon_lat = self.geoCoordsList[::-1]
elif self.__class__.__name__ == "FsqacsCSV" and self.options_geo:
lon_lat = self.geo_coords_list
else:
return {"Error":"This publisher doesn't have geo"}
return {
"type": "Feature"
, "geometry": { "type": "Point", "coordinates": lon_lat }
, "properties": { "id": record_list[0] }
}
[docs] def keyPath(self,d):
"""Get a generic key path specified at run time. Consider using jq instead?"""
#key_list = self.options_keypath.split(":")
delim = ":"
#print >> sys.stderr, "self.__class__ " + str(self.__class__)
if self.__class__.__name__ == "NGacsCSV":
delim = ","
key_stack = self.options_keypath.split(delim)
#print >> sys.stderr, "key_stack " + str(key_stack)
x = d
while len(key_stack) > 0:
try:
k = key_stack.pop(0)
try:
idx = int(k)
except ValueError:
# keys are ascii strings
idx = str(k)
x = x[idx]
except (IndexError, TypeError, KeyError) as e:
#sys.stderr.write("Keypath error at %s\n"%k)
return "PATH_EMPTY"
return unicode(x)