The_chosen_one There can only be one ! | Bon alors j'ai trouvé
1. certains mails sont de la forme encode(nom + mail) et d'autres encode(nom) + mail :
j'ai séparé les 2 cas avec des tests
2. certains mails sont mal codés :
j'ai pas trouvé d'autre solution que de tester et faire les corrections spécifiquement Voila le code si ça interesse des gens :
Code :
- import rfc822
- import csv
- import codecs
- import imaplib
- import logging
- import getopt
- import sys
- import getpass
- import stringscanner
- import re
- import math
- import time
- import datetime
- import email
- import email.header
- import email.utils
- import cStringIO
- class UTF8Recoder:
- """
- Iterator that reads an encoded stream and reencodes the input to UTF-8
- """
- def __init__(self, f, encoding):
- self.reader = codecs.getreader(encoding)(f)
- def __iter__(self):
- return self
- def next(self):
- return self.reader.next().encode("utf-8" )
- class UnicodeReader:
- """
- A CSV reader which will iterate over lines in the CSV file "f",
- which is encoded in the given encoding.
- """
- def __init__(self, f, dialect=csv.excel, encoding="utf-8", **kwds):
- f = UTF8Recoder(f, encoding)
- self.reader = csv.reader(f, dialect=dialect, **kwds)
- def next(self):
- row = self.reader.next()
- return [unicode(s, "utf-8" ) for s in row]
- def __iter__(self):
- return self
- class UnicodeWriter:
- """
- A CSV writer which will write rows to CSV file "f",
- which is encoded in the given encoding.
- """
- def __init__(self, f, dialect=csv.excel, encoding="utf-8", **kwds):
- # Redirect output to a queue
- self.queue = cStringIO.StringIO()
- self.writer = csv.writer(self.queue, dialect=dialect, **kwds)
- self.stream = f
- self.encoder = codecs.getincrementalencoder(encoding)()
- def writerow(self, row):
- self.writer.writerow([s.encode("utf-8" ) for s in row])
- # Fetch UTF-8 output from the queue ...
- data = self.queue.getvalue()
- data = data.decode("utf-8" )
- # ... and reencode it into the target encoding
- data = self.encoder.encode(data)
- # write to the target stream
- self.stream.write(data)
- # empty queue
- self.queue.truncate(0)
- def writerows(self, rows):
- for row in rows:
- self.writerow(row)
- def GetOptsMap():
- opts, args = getopt.getopt(sys.argv[1:], "", [
- # Standard options
- "username=", "password=", "to=",
- # Other params
- "chunk=", "server="])
-
- opts_map = {}
- for name, value in opts:
- opts_map[name[2:]] = value
- assert "username" in opts_map
-
- if "password" not in opts_map:
- opts_map["password"] = getpass.getpass(
- prompt="Password for %s: " % opts_map["username"])
-
- assert "password" in opts_map
- if "server" not in opts_map:
- opts_map["server"] = "imap.gmail.com"
- if "chunk" not in opts_map:
- opts_map["chunk"] = 100
- assert "server" in opts_map
- assert "to" in opts_map
-
- return opts_map
- def GetMailboxes(mail):
- logging.info("Getting mailboxes" )
-
- r, mailboxes_data = mail.list()
- __AssertOk(r)
-
- mailboxes = []
- for mailbox_data in mailboxes_data:
- s = mailbox_data
- mailboxes.append(s)
-
- return mailboxes
- def __ParseFetchReply(fetch_reply):
- message_infos = []
-
- return message_infos
- def __AssertOk(response):
- assert response == "OK"
- logging.basicConfig(level=logging.DEBUG, format="[%(asctime)s] %(message)s" )
- opts = GetOptsMap()
- imap_constructor = imaplib.IMAP4_SSL
- logging.info("Connecting" )
- mail = imap_constructor(opts["server"])
- logging.info("Logging in" )
- mail.login(opts["username"], opts["password"])
- mailbox = "[Gmail]/All Mail"
- logging.info("Selecting mailbox: \"%s\". If there is an error here, you must switch Gmail interface to English(US)." %mailbox)
- r,data=mail.select(mailbox)
- __AssertOk(r)
- searchstring = "(TO \""+opts["to"]+"\" )"
- logging.info("Searching mails for : %s" %searchstring)
- r,data=mail.search(None, searchstring)
- __AssertOk(r)
- all_mails = data[0].split(" " )
- nbmails = len(all_mails)
- logging.info("Got %s mails !", nbmails)
- out = UnicodeWriter(open('out.csv', 'w'), dialect=csv.excel, delimiter=';', quotechar='"', quoting=csv.QUOTE_MINIMAL, lineterminator='\n')
- out.writerow(['year', 'month', 'day', 'dayofweek', 'hour', 'minute', 'sec', 'timestamp', 'name', 'addr', 'subject'])
- chunk = min(opts["chunk"], nbmails)
- for min in range(1, nbmails, chunk):
- max = min + chunk
- if max > nbmails:
- max = nbmails
- chunk = max - min
-
- logging.info("Fetching emails %s -> %s (of %s)", min, max, nbmails)
- fetch_string = ",".join(all_mails[min:max])
- r, data = mail.fetch(fetch_string, "(BODY[HEADER.FIELDS (DATE FROM SUBJECT)])" )
- __AssertOk(r)
- for mi in range(0, chunk*2, 2):
- t = data[mi][1]
- _date = email.utils.parsedate(re.search("Date: (.+)(\\r\\n)+", t).group(1))
- mytime = time.mktime(_date)
- _from = re.search("From: (.+)(\\r\\n)+", t).group(1)
- _from=_from.replace("=?utf-8?Q?Micha=C3=ABl_Jxxxxt?= <michael.jxxxxt@gmail.com>", "\"=?utf-8?Q?Micha=C3=ABl_Jxxxxt?=\" <michael.jxxxxt@gmail.com>" )
- encoding = email.header.decode_header(_from)[0][1]
-
- if encoding == None:
- encoding = email.header.decode_header(rfc822.parseaddr(email.header.decode_header(_from)[0][0])[0])[0][1]
-
- if encoding == None:
- encoding = "ascii"
- _name = rfc822.parseaddr(email.header.decode_header(_from)[0][0])[0].decode(encoding)
-
- else:
- _name= email.header.decode_header(rfc822.parseaddr(email.header.decode_header(_from)[0][0])[0])[0][0].decode(encoding)
-
- else:
- _name = rfc822.parseaddr(email.header.decode_header(_from)[0][0])[0].decode(encoding)
-
- _email = rfc822.parseaddr(email.header.decode_header(_from)[0][0])[1]
- subject = re.search("Subject: (.+)(\\r\\n)+", t).group(1)
- encoding = email.header.decode_header(subject)[0][1]
-
- if encoding == None:
- encoding = "ascii"
-
- subject = email.header.decode_header(subject)[0][0].decode(encoding)
- while subject <> re.search("([rR][eE][_ ]?:[_ ]?)?(.+)", subject).group(2):
- subject = re.search("([rR][eE][_ ]?:[_ ]?)?(.+)", subject).group(2)
-
- out.writerow([str(_date[0]), str(_date[1]), str(_date[2]), str(datetime.date(_date[0], _date[1], _date[2]).weekday()), str(_date[3]), str(_date[4]), str(_date[5]), str(mytime), _name, _email, subject])
- logging.info("Done" )
|
Du coup ne me reste qu'un problème mineur : j'enregistre le fichier en utf-8 alors que excel l'ouvre en ansi (et c'est moche), et je trouve pas l'encoding ansi en python ... |