Source code for acscsv.twitter_acs
# -*- coding: UTF-8 -*-
__author__="Scott Hendrickson, Josh Montague"
__license__="Simplified BSD"
import sys
import acscsv
from twitter_acs_fields import *
[docs]class TwacsCSV(acscsv.AcsCSV):
"""Subset of Twitter fields with specified delimiter. Please see help for options"""
def __init__(self
, delim
, options_keypath
, options_geo
, options_user
, options_rules
, options_urls
, options_lang
, options_influence
, options_struct
):
super(TwacsCSV, self).__init__(delim, options_keypath)
self.options_geo = options_geo
self.options_user = options_user
self.options_rules = options_rules
self.options_urls = options_urls
self.options_lang = options_lang
self.options_influence = options_influence
self.options_struct = options_struct
# pre-create all of teh objects needed for parsing
#for name, obj in inspect.getmembers(sys.modules[__name__]):
# if name.startswith("Field_"):
# setattr(self, name.lower()+"_", obj(None))
[docs] def procRecordToList(self, d):
"""
Take a JSON Activity Streams payload as a Python dictionary. Check activity for system
information and compliance handling. If necessary, return the system info or compliance
message. Otherwise, if the activity is valid, return the result of calling the
appropriate output() method.
"""
record = []
try:
verb = Field_verb(d).value
# see: http://support.gnip.com/apis/consuming_streaming_data.html#Consuming
system_msgs = ["error", "warning", "info"]
if verb in system_msgs:
msg = "Unidentified meta message"
for mtype in system_msgs:
if mtype in d:
if "message" in d[mtype]:
msg = d[mtype]["message"]
elif "message" in d:
msg = d["message"]
continue
mtype = "Unidentified"
record.append('-'.join([acscsv.gnipRemove, mtype]))
record.append(acscsv.gnipDateTime)
record.append(msg)
return record
elif verb == "delete":
record.append(d["object"]["id"])
record.append(acscsv.gnipDateTime)
record.append('-'.join([acscsv.gnipRemove, verb]))
return record
elif verb == "scrub_geo":
record.append(d["actor"]["id"])
record.append(acscsv.gnipDateTime)
record.append('-'.join([acscsv.gnipRemove, verb]))
return record
except KeyError:
record.append(acscsv.gnipError)
record.append(acscsv.gnipRemove)
return record
# at this point, verb is an acceptable record
return self.get_output_list(d)
[docs] def get_output_list(self, d):
"""
Take a JSON Activity Streams payload as a Python dictionary. Specify the particular
output fields (and their order) by constructing and returning a list of the
desired extractor values. Default values for missing fields are set in the _Field class
and can be overridden.
"""
output_list = []
# base output = id | timestamp | body
output_list.append( Field_id(d).value )
output_list.append( Field_postedtime(d).value )
output_list.append( Field_body(d).value )
# urls
if self.options_urls:
#
# TODO: add back this exception handling for -x option
# https://github.com/DrSkippy/Gnacs/blob/16dd146fb05d02d7c1e3f282254e6718fd13303f/acscsv/twacscsv.py#L97
#
# gnip
val = Field_gnip_urls(d).value
if isinstance(val, list):
output_list.append( self.buildListString( [ x["expanded_url"] for x in val ] ) )
else:
output_list.append( val )
# twitter
val = Field_twitter_entities_urls(d).value
if isinstance(val, list):
url_list = self.buildListString( [ x["url"] for x in val ] )
exp_url_list = self.buildListString( [ x["expanded_url"] for x in val ] )
else:
url_list = val
exp_url_list = val
output_list.append( url_list )
output_list.append( exp_url_list )
# languages
if self.options_lang:
# actor
# - this field has *very* infrequently contained unicode chars. drop them.
output_list.append( Field_actor_language(d).value.encode('ascii', 'ignore') )
# classifications
output_list.append( Field_gnip_language_value(d).value )
output_list.append( Field_twitter_lang(d).value )
# rules
if self.options_rules:
val = Field_gnip_rules(d).value
if isinstance(val, list):
# output: '[" value (tag)", ... ]'
output_list.append(
self.buildListString(
[ "{} ({})".format( x["value"], x["tag"] ) for x in Field_gnip_rules(d).value ]
)
)
else:
output_list.append( val )
# geo-related fields
if self.options_geo:
# geo-tag
val = Field_geo_coordinates(d).value
# keep self.geoCoordsList for backward compatibility
self.geoCoordsList = None
if isinstance(val, list):
output_list.append( str(val) )
self.geoCoordsList = val
else:
output_list.append( val )
output_list.append( Field_geo_type(d).value )
val = Field_location_geo_coordinates(d).value
if isinstance(val, list):
output_list.append( str(val) )
else:
output_list.append( val )
output_list.append( Field_location_geo_type(d).value )
output_list.append( Field_location_displayname(d).value )
output_list.append( Field_location_twitter_country_code(d).value )
# user
output_list.append( Field_actor_utcoffset(d).value )
output_list.append( Field_actor_location_displayname(d).value )
# profileLocations
output_list.append( Field_gnip_profilelocations_displayname(d).value )
output_list.append( Field_gnip_profilelocations_objecttype(d).value )
output_list.append( Field_gnip_profilelocations_address_country(d).value )
output_list.append( Field_gnip_profilelocations_address_region(d).value )
output_list.append( Field_gnip_profilelocations_address_countrycode(d).value )
output_list.append( Field_gnip_profilelocations_address_locality(d).value )
output_list.append( Field_gnip_profilelocations_geo_type(d).value )
output_list.append( Field_gnip_profilelocations_geo_coordinates(d).value )
# user
if self.options_user:
output_list.append( Field_actor_displayname(d).value )
output_list.append( Field_actor_preferredusername(d).value )
output_list.append( Field_actor_id(d).value )
# user connections, klout
if self.options_influence:
output_list.append( Field_gnip_klout_score(d).value )
output_list.append( Field_actor_followerscount(d).value )
output_list.append( Field_actor_friendscount(d).value )
output_list.append( Field_actor_listedcount(d).value )
output_list.append( Field_actor_statusesCount(d).value )
# structure
if self.options_struct:
output_list.append( Field_activity_type(d).value )
# done building output list
return output_list